Skip to content

Commit

Permalink
fix com
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jun 20, 2024
1 parent ab6422a commit 267e6c9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 25 deletions.
20 changes: 1 addition & 19 deletions src/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,9 @@ pub fn stream_enforce_eowc_requirement(
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct RewriteStreamContext {
share_rewrite_map: HashMap<PlanNodeId, (PlanRef, ColIndexMapping)>,
with_stream_key: bool,
}

impl Default for RewriteStreamContext {
fn default() -> Self {
Self {
share_rewrite_map: HashMap::default(),
with_stream_key: true,
}
}
}

impl RewriteStreamContext {
Expand All @@ -128,14 +118,6 @@ impl RewriteStreamContext {
) -> Option<&(PlanRef, ColIndexMapping)> {
self.share_rewrite_map.get(&plan_node_id)
}

pub fn set_with_stream_key(&mut self, with_stream_key: bool) {
self.with_stream_key = with_stream_key;
}

pub fn get_with_stream_key(&self) -> bool {
self.with_stream_key
}
}

#[derive(Debug, Clone)]
Expand Down
24 changes: 20 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;

use super::expr_visitable::ExprVisitable;
use super::generic::{GenericPlanRef, CHANGE_LOG_OP, _CHANGE_LOG_ROW_ID};
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical,
PlanBase, PlanTreeNodeUnary, PredicatePushdown, RewriteStreamContext, StreamChangeLog,
StreamRowIdGen, ToBatch, ToStream, ToStreamContext,
LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown, RewriteStreamContext,
StreamChangeLog, StreamRowIdGen, ToBatch, ToStream, ToStreamContext,
};
use crate::error::ErrorCode::BindError;
use crate::error::Result;
use crate::expr::{ExprImpl, InputRef};
use crate::optimizer::property::Distribution;
use crate::utils::{ColIndexMapping, Condition};
use crate::PlanRef;
Expand Down Expand Up @@ -149,9 +152,22 @@ impl ToStream for LogicalChangeLog {
&self,
ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
ctx.set_with_stream_key(false);
let original_schema = self.input().schema().clone();
let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
let (change_log, out_col_change) = self.rewrite_with_input(input, input_col_change);
let exprs = (0..original_schema.len())
.map(|x| {
ExprImpl::InputRef(
InputRef::new(
input_col_change.map(x),
original_schema.fields[x].data_type.clone(),
)
.into(),
)
})
.collect_vec();
let project = LogicalProject::new(input.clone(), exprs);
let (project, out_col_change) = project.rewrite_with_input(input, input_col_change);
let (change_log, out_col_change) = self.rewrite_with_input(project.into(), out_col_change);
Ok((change_log.into(), out_col_change))
}
}
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,9 @@ impl ToStream for LogicalScan {

fn logical_rewrite_for_stream(
&self,
ctx: &mut RewriteStreamContext,
_ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
match self.base.stream_key().is_none() && ctx.get_with_stream_key() {
match self.base.stream_key().is_none() {
true => {
let mut col_ids = HashSet::new();

Expand Down

0 comments on commit 267e6c9

Please sign in to comment.