diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index c4b3e949fe4f..223a23813f68 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -278,6 +278,9 @@ message ProjectNode { repeated uint32 watermark_input_cols = 2; repeated uint32 watermark_output_cols = 3; repeated uint32 nondecreasing_exprs = 4; + // Whether there are likely no-op updates in the output chunks, so that eliminating them with + // `StreamChunk::eliminate_adjacent_noop_update` could be beneficial. + bool noop_update_hint = 5; } message FilterNode { diff --git a/src/frontend/planner_test/tests/testdata/input/agg.yaml b/src/frontend/planner_test/tests/testdata/input/agg.yaml index 154680c8acbc..70f83549ff18 100644 --- a/src/frontend/planner_test/tests/testdata/input/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/input/agg.yaml @@ -281,12 +281,14 @@ select distinct v1 from t; expected_outputs: - logical_plan + - stream_plan - name: distinct with agg sql: | create table t (v1 int, v2 int); select distinct sum(v1) from t group by v2; expected_outputs: - logical_plan + - stream_plan - name: distinct on sql: | create table t (v1 int, v2 int, v3 int); diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index 9a07df7558d9..4c75b8318774 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -451,6 +451,12 @@ LogicalAgg { group_key: [t.v1], aggs: [] } └─LogicalProject { exprs: [t.v1] } └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + stream_plan: |- + StreamMaterialize { columns: [v1], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.v1], noop_update_hint: true } + └─StreamHashAgg { group_key: [t.v1], aggs: [count] } + └─StreamExchange { dist: HashShard(t.v1) } + └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: distinct with agg sql: | create table t (v1 int, v2 int); @@ -461,6 +467,15 @@ └─LogicalAgg { group_key: [t.v2], aggs: [sum(t.v1)] } └─LogicalProject { exprs: [t.v2, t.v1] } └─LogicalScan { table: t, columns: [t.v1, t.v2, t._row_id] } + stream_plan: |- + StreamMaterialize { columns: [sum], stream_key: [sum], pk_columns: [sum], pk_conflict: NoCheck } + └─StreamProject { exprs: [sum(t.v1)], noop_update_hint: true } + └─StreamHashAgg { group_key: [sum(t.v1)], aggs: [count] } + └─StreamExchange { dist: HashShard(sum(t.v1)) } + └─StreamProject { exprs: [t.v2, sum(t.v1)] } + └─StreamHashAgg { group_key: [t.v2], aggs: [sum(t.v1), count] } + └─StreamExchange { dist: HashShard(t.v2) } + └─StreamTableScan { table: t, columns: [t.v1, t.v2, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } - name: distinct on sql: | create table t (v1 int, v2 int, v3 int); @@ -576,7 +591,7 @@ └─LogicalScan { table: t, columns: [t.v1] } stream_plan: |- StreamMaterialize { columns: [v1], stream_key: [v1], pk_columns: [v1], pk_conflict: NoCheck } - └─StreamProject { exprs: [t.v1] } + └─StreamProject { exprs: [t.v1], noop_update_hint: true } └─StreamHashAgg { group_key: [t.v1], aggs: [count] } └─StreamExchange { dist: HashShard(t.v1) } └─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -1797,7 +1812,7 @@ stream_plan: |- StreamMaterialize { columns: [a, row_number], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.a ORDER BY t.a DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─StreamProject { exprs: [t.a] } + └─StreamProject { exprs: [t.a], noop_update_hint: true } └─StreamHashAgg { group_key: [t.a], aggs: [count] } └─StreamExchange { dist: HashShard(t.a) } └─StreamTableScan { table: t, columns: [t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/except.yaml b/src/frontend/planner_test/tests/testdata/output/except.yaml index 58b4f757955c..d19ec9882c42 100644 --- a/src/frontend/planner_test/tests/testdata/output/except.yaml +++ b/src/frontend/planner_test/tests/testdata/output/except.yaml @@ -25,7 +25,7 @@ └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } - └─StreamProject { exprs: [t1.a, t1.b, t1.c] } + └─StreamProject { exprs: [t1.a, t1.b, t1.c], noop_update_hint: true } └─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } └─StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } ├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } @@ -35,7 +35,7 @@ stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [t1.a, t1.b, t1.c] } + └── StreamProject { exprs: [t1.a, t1.b, t1.c], noop_update_hint: true } └── StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } { tables: [ HashAggState: 0 ] } └── StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } ├── tables: [ HashJoinLeft: 1, HashJoinDegreeLeft: 2, HashJoinRight: 3, HashJoinDegreeRight: 4 ] diff --git a/src/frontend/planner_test/tests/testdata/output/intersect.yaml b/src/frontend/planner_test/tests/testdata/output/intersect.yaml index 87834b2ca2d2..678091bbb2e4 100644 --- a/src/frontend/planner_test/tests/testdata/output/intersect.yaml +++ b/src/frontend/planner_test/tests/testdata/output/intersect.yaml @@ -25,7 +25,7 @@ └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } - └─StreamProject { exprs: [t1.a, t1.b, t1.c] } + └─StreamProject { exprs: [t1.a, t1.b, t1.c], noop_update_hint: true } └─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } └─StreamHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } ├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } @@ -35,7 +35,7 @@ stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamProject { exprs: [t1.a, t1.b, t1.c] } + └── StreamProject { exprs: [t1.a, t1.b, t1.c], noop_update_hint: true } └── StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } { tables: [ HashAggState: 0 ] } └── StreamHashJoin { type: LeftSemi, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all } ├── tables: [ HashJoinLeft: 1, HashJoinDegreeLeft: 2, HashJoinRight: 3, HashJoinDegreeRight: 4 ] diff --git a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml index acac3cc23699..39639b3ebb64 100644 --- a/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/lateral_subquery.yaml @@ -164,7 +164,7 @@ ├─StreamExchange { dist: HashShard(t.arr) } │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamProjectSet { select_list: [$0, Unnest($0)] } - └─StreamProject { exprs: [t.arr] } + └─StreamProject { exprs: [t.arr], noop_update_hint: true } └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -193,7 +193,7 @@ │ │ └─StreamTableScan { table: t1, columns: [t1.n, t1.id, t1._row_id, t1.c], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ └─StreamExchange { dist: HashShard(Unnest(Case(($1 <> '':Varchar), ArrayAppend(StringToArray(Trim($1, ',':Varchar), ',':Varchar), $3), Array($3)))) } │ └─StreamProjectSet { select_list: [$0, $1, $2, $3, Unnest(Case(($1 <> '':Varchar), ArrayAppend(StringToArray(Trim($1, ',':Varchar), ',':Varchar), $3), Array($3)))] } - │ └─StreamProject { exprs: [t2.p, t2.p, t2.d, t2.d] } + │ └─StreamProject { exprs: [t2.p, t2.p, t2.d, t2.d], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t2.p, t2.p, t2.d, t2.d], aggs: [count] } │ └─StreamExchange { dist: HashShard(t2.p, t2.p, t2.d, t2.d) } │ └─StreamProject { exprs: [t2.p, t2.p, t2.d, t2.d, t2._row_id] } @@ -226,7 +226,7 @@ └─StreamHashAgg { group_key: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5], aggs: [sum($expr4) filter(((t2.c7 * t1.c3) <= (t2.c7 * t2.c8))), sum(t1.c2) filter(((t2.c7 * t1.c3) <= (t2.c7 * t2.c8))), count] } └─StreamHashJoin { type: LeftOuter, predicate: t2.c7 IS NOT DISTINCT FROM t2.c7 AND t2.c7 IS NOT DISTINCT FROM t2.c7 AND t2.c8 IS NOT DISTINCT FROM t2.c8 AND t2.c7 IS NOT DISTINCT FROM t2.c7 AND t2.c7 IS NOT DISTINCT FROM t2.c7 AND t2.c8 IS NOT DISTINCT FROM t2.c8 AND t2.c1 IS NOT DISTINCT FROM t2.c1 AND t2.c5 IS NOT DISTINCT FROM t2.c5 AND t2.c5 IS NOT DISTINCT FROM t2.c5, output: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5, t1.c3, $expr4, t1.c2, t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5, t1._row_id, $expr1] } ├─StreamExchange { dist: HashShard(t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5) } - │ └─StreamProject { exprs: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5] } + │ └─StreamProject { exprs: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5], aggs: [count] } │ └─StreamExchange { dist: HashShard(t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5) } │ └─StreamTableScan { table: t2, columns: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) } @@ -235,7 +235,7 @@ └─StreamFilter { predicate: (t2.c5 <= $expr2) } └─StreamHashJoin { type: Inner, predicate: t2.c1 = t1.c1 AND $expr1 = $expr3, output: all } ├─StreamExchange { dist: HashShard(t2.c1, $expr1) } - │ └─StreamProject { exprs: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5, AtTimeZone(t2.c5, 'UTC':Varchar)::Date as $expr1] } + │ └─StreamProject { exprs: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5, AtTimeZone(t2.c5, 'UTC':Varchar)::Date as $expr1], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5], aggs: [count] } │ └─StreamExchange { dist: HashShard(t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5) } │ └─StreamTableScan { table: t2, columns: [t2.c7, t2.c7, t2.c8, t2.c7, t2.c7, t2.c8, t2.c1, t2.c5, t2.c5, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 41007d8de118..298653450f65 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -748,7 +748,7 @@ │ └─StreamProject { exprs: [person.id, person.name, $expr1, ($expr1 + '00:00:10':Interval) as $expr2] } │ └─StreamProject { exprs: [person.id, person.name, person.date_time, TumbleStart(person.date_time, '00:00:10':Interval) as $expr1] } │ └─StreamTableScan { table: person, columns: [person.id, person.name, person.date_time], stream_scan_type: ArrangementBackfill, stream_key: [person.id], pk: [id], dist: UpstreamHashShard(person.id) } - └─StreamProject { exprs: [auction.seller, $expr3, $expr4] } + └─StreamProject { exprs: [auction.seller, $expr3, $expr4], noop_update_hint: true } └─StreamHashAgg { group_key: [auction.seller, $expr3, $expr4], aggs: [count] } └─StreamExchange { dist: HashShard(auction.seller, $expr3, $expr4) } └─StreamProject { exprs: [auction.seller, $expr3, ($expr3 + '00:00:10':Interval) as $expr4, auction.id] } @@ -764,7 +764,7 @@ StreamHashJoin { type: Inner, predicate: person.id = auction.seller AND $expr1 = $expr3 AND $expr2 = $expr4, output: [person.id, internal_last_seen_value(person.name), $expr1, $expr2, auction.seller, $expr3, $expr4] } ├── tables: [ HashJoinLeft: 0, HashJoinDegreeLeft: 1, HashJoinRight: 2, HashJoinDegreeRight: 3 ] ├── StreamExchange Hash([0, 1, 2]) from 2 - └── StreamProject { exprs: [auction.seller, $expr3, $expr4] } + └── StreamProject { exprs: [auction.seller, $expr3, $expr4], noop_update_hint: true } └── StreamHashAgg { group_key: [auction.seller, $expr3, $expr4], aggs: [count] } { tables: [ HashAggState: 6 ] } └── StreamExchange Hash([0, 1, 2]) from 3 diff --git a/src/frontend/planner_test/tests/testdata/output/pk_derive.yaml b/src/frontend/planner_test/tests/testdata/output/pk_derive.yaml index 5c8e87aea559..7392c88bde3c 100644 --- a/src/frontend/planner_test/tests/testdata/output/pk_derive.yaml +++ b/src/frontend/planner_test/tests/testdata/output/pk_derive.yaml @@ -77,7 +77,7 @@ └─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3] } stream_plan: |- StreamMaterialize { columns: [v1, v2, v3], stream_key: [v1, v2, v3], pk_columns: [v1, v2, v3], pk_conflict: NoCheck } - └─StreamProject { exprs: [t.v1, t.v2, t.v3] } + └─StreamProject { exprs: [t.v1, t.v2, t.v3], noop_update_hint: true } └─StreamHashAgg { group_key: [t.v1, t.v2, t.v3], aggs: [count] } └─StreamExchange { dist: HashShard(t.v1, t.v2, t.v3) } └─StreamTableScan { table: t, columns: [t.v1, t.v2, t.v3, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/project_set.yaml b/src/frontend/planner_test/tests/testdata/output/project_set.yaml index 890480493af9..8373ea5187fc 100644 --- a/src/frontend/planner_test/tests/testdata/output/project_set.yaml +++ b/src/frontend/planner_test/tests/testdata/output/project_set.yaml @@ -118,7 +118,7 @@ └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [unnest], stream_key: [unnest], pk_columns: [unnest], pk_conflict: NoCheck } - └─StreamProject { exprs: [Unnest($0)] } + └─StreamProject { exprs: [Unnest($0)], noop_update_hint: true } └─StreamHashAgg { group_key: [Unnest($0)], aggs: [count] } └─StreamExchange { dist: HashShard(Unnest($0)) } └─StreamProjectSet { select_list: [Unnest($0), $1] } diff --git a/src/frontend/planner_test/tests/testdata/output/share.yaml b/src/frontend/planner_test/tests/testdata/output/share.yaml index cfe5d841cb2c..2cf3aee9fe04 100644 --- a/src/frontend/planner_test/tests/testdata/output/share.yaml +++ b/src/frontend/planner_test/tests/testdata/output/share.yaml @@ -222,12 +222,12 @@ └─StreamStatelessSimpleAgg { aggs: [count] } └─StreamHashJoin { type: Inner, predicate: t.a = t.a, output: all } ├─StreamShare { id: 4 } - │ └─StreamProject { exprs: [t.a] } + │ └─StreamProject { exprs: [t.a], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t.a], aggs: [count] } │ └─StreamExchange { dist: HashShard(t.a) } │ └─StreamTableScan { table: t, columns: [t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamShare { id: 4 } - └─StreamProject { exprs: [t.a] } + └─StreamProject { exprs: [t.a], noop_update_hint: true } └─StreamHashAgg { group_key: [t.a], aggs: [count] } └─StreamExchange { dist: HashShard(t.a) } └─StreamTableScan { table: t, columns: [t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -251,7 +251,7 @@ └── StreamExchange NoShuffle from 4 Fragment 2 - StreamProject { exprs: [t.a] } + StreamProject { exprs: [t.a], noop_update_hint: true } └── StreamHashAgg { group_key: [t.a], aggs: [count] } { tables: [ HashAggState: 5 ] } └── StreamExchange Hash([0]) from 3 diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index b981f8f968ed..f9ae7cdee3f0 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -331,9 +331,9 @@ │ └─StreamHopWindow { time_col: auction.date_time, slide: 00:00:01, size: 01:00:00, output: [auction.date_time, window_start, window_end, auction._row_id] } │ └─StreamFilter { predicate: IsNotNull(auction.date_time) } │ └─StreamTableScan { table: auction, columns: [auction.date_time, auction._row_id], stream_scan_type: ArrangementBackfill, stream_key: [auction._row_id], pk: [_row_id], dist: UpstreamHashShard(auction._row_id) } - └─StreamProject { exprs: [auction.date_time] } + └─StreamProject { exprs: [auction.date_time], noop_update_hint: true } └─StreamHashAgg { group_key: [auction.date_time], aggs: [count] } - └─StreamProject { exprs: [auction.date_time] } + └─StreamProject { exprs: [auction.date_time], noop_update_hint: true } └─StreamHashAgg { group_key: [auction.date_time], aggs: [count] } └─StreamExchange { dist: HashShard(auction.date_time) } └─StreamShare { id: 3 } @@ -529,13 +529,13 @@ └─StreamProject { exprs: [t.x, sum(Unnest($0))] } └─StreamHashAgg { group_key: [t.x], aggs: [sum(Unnest($0)), count] } └─StreamHashJoin { type: LeftOuter, predicate: t.x IS NOT DISTINCT FROM t.x, output: [t.x, Unnest($0), t.x, projected_row_id] } - ├─StreamProject { exprs: [t.x] } + ├─StreamProject { exprs: [t.x], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t.x], aggs: [count] } │ └─StreamExchange { dist: HashShard(t.x) } │ └─StreamTableScan { table: t, columns: [t.x, t.k], stream_scan_type: ArrangementBackfill, stream_key: [t.k], pk: [k], dist: UpstreamHashShard(t.k) } └─StreamProject { exprs: [t.x, Unnest($0), projected_row_id] } └─StreamProjectSet { select_list: [$0, Unnest($0)] } - └─StreamProject { exprs: [t.x] } + └─StreamProject { exprs: [t.x], noop_update_hint: true } └─StreamHashAgg { group_key: [t.x], aggs: [count] } └─StreamExchange { dist: HashShard(t.x) } └─StreamTableScan { table: t, columns: [t.x, t.k], stream_scan_type: ArrangementBackfill, stream_key: [t.k], pk: [k], dist: UpstreamHashShard(t.k) } @@ -817,7 +817,7 @@ └─StreamProject { exprs: [integers.correlated_col, (count(distinct rows.k) + count(distinct rows.v)) as $expr1] } └─StreamHashAgg { group_key: [integers.correlated_col], aggs: [count(distinct rows.k), count(distinct rows.v), count] } └─StreamHashJoin { type: LeftOuter, predicate: integers.correlated_col IS NOT DISTINCT FROM rows.correlated_col, output: [integers.correlated_col, rows.k, rows.v, rows._row_id] } - ├─StreamProject { exprs: [integers.correlated_col] } + ├─StreamProject { exprs: [integers.correlated_col], noop_update_hint: true } │ └─StreamHashAgg { group_key: [integers.correlated_col], aggs: [count] } │ └─StreamExchange { dist: HashShard(integers.correlated_col) } │ └─StreamTableScan { table: integers, columns: [integers.correlated_col, integers._row_id], stream_scan_type: ArrangementBackfill, stream_key: [integers._row_id], pk: [_row_id], dist: UpstreamHashShard(integers._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml index 3bb2639f1293..09905835f200 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery_expr_correlated.yaml @@ -1096,7 +1096,7 @@ └─StreamProject { exprs: [t.b, count(1:Int32)] } └─StreamHashAgg { group_key: [t.b], aggs: [count(1:Int32), count] } └─StreamHashJoin { type: LeftOuter, predicate: t.b IS NOT DISTINCT FROM t2.d, output: [t.b, 1:Int32, t2._row_id] } - ├─StreamProject { exprs: [t.b] } + ├─StreamProject { exprs: [t.b], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t.b], aggs: [count] } │ └─StreamExchange { dist: HashShard(t.b) } │ └─StreamTableScan { table: t, columns: [t.b, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -1331,7 +1331,7 @@ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1340,7 +1340,7 @@ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1349,7 +1349,7 @@ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1358,7 +1358,7 @@ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1367,7 +1367,7 @@ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1376,7 +1376,7 @@ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1385,7 +1385,7 @@ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1394,7 +1394,7 @@ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1403,7 +1403,7 @@ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1412,7 +1412,7 @@ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1421,7 +1421,7 @@ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1430,7 +1430,7 @@ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1439,7 +1439,7 @@ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1448,7 +1448,7 @@ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1457,7 +1457,7 @@ │ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1466,7 +1466,7 @@ │ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1475,7 +1475,7 @@ │ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1484,7 +1484,7 @@ └─StreamProject { exprs: [t1.a, t1.b, t1.b] } └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } ├─StreamExchange { dist: HashShard(t1.a) } - │ └─StreamProject { exprs: [t1.a, t1.b] } + │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1802,7 +1802,7 @@ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [sum(t2.d), count] } │ │ │ │ │ └─StreamHashJoin { type: LeftOuter, predicate: t1.a IS NOT DISTINCT FROM t1.a AND t1.b IS NOT DISTINCT FROM t1.b, output: [t1.a, t1.b, t2.d, t1.a, t1.b, t3._row_id, t2.c, t4._row_id, t5._row_id, t5.j] } │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1814,7 +1814,7 @@ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } │ │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1822,7 +1822,7 @@ │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t2, columns: [t2.c, t2.d], stream_scan_type: ArrangementBackfill, stream_key: [t2.c], pk: [c], dist: UpstreamHashShard(t2.c) } │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t3.e AND t1.a = t3.f, output: [t1.a, t1.b, t3.f, t3._row_id] } │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1830,7 +1830,7 @@ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t3, columns: [t3.e, t3.f, t3._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t3._row_id], pk: [_row_id], dist: UpstreamHashShard(t3._row_id) } │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t4.g AND t1.a = t4.h, output: [t1.a, t1.b, t4.h, t4._row_id] } │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1838,7 +1838,7 @@ │ │ │ │ │ │ │ └─StreamTableScan { table: t4, columns: [t4.g, t4.h, t4._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t4._row_id], pk: [_row_id], dist: UpstreamHashShard(t4._row_id) } │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t5.i AND t1.a = t5.j, output: [t1.a, t1.b, t5.j, t5._row_id] } │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1846,7 +1846,7 @@ │ │ │ │ │ │ └─StreamTableScan { table: t5, columns: [t5.i, t5.j, t5._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t5._row_id], pk: [_row_id], dist: UpstreamHashShard(t5._row_id) } │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.b = t6.l, output: [t1.a, t1.b, t6.k, t6._row_id] } │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1856,7 +1856,7 @@ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [sum(t2.d), count] } │ │ │ │ └─StreamHashJoin { type: LeftOuter, predicate: t1.a IS NOT DISTINCT FROM t1.a AND t1.b IS NOT DISTINCT FROM t1.b, output: [t1.a, t1.b, t2.d, t1.a, t1.b, t3._row_id, t2.c, t4._row_id, t5._row_id, t5.j] } │ │ │ │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1868,7 +1868,7 @@ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } │ │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1876,7 +1876,7 @@ │ │ │ │ │ │ │ │ └─StreamTableScan { table: t2, columns: [t2.c, t2.d], stream_scan_type: ArrangementBackfill, stream_key: [t2.c], pk: [c], dist: UpstreamHashShard(t2.c) } │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t3.e AND t1.a = t3.f, output: [t1.a, t1.b, t3.f, t3._row_id] } │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1884,7 +1884,7 @@ │ │ │ │ │ │ │ └─StreamTableScan { table: t3, columns: [t3.e, t3.f, t3._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t3._row_id], pk: [_row_id], dist: UpstreamHashShard(t3._row_id) } │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t4.g AND t1.a = t4.h, output: [t1.a, t1.b, t4.h, t4._row_id] } │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1892,7 +1892,7 @@ │ │ │ │ │ │ └─StreamTableScan { table: t4, columns: [t4.g, t4.h, t4._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t4._row_id], pk: [_row_id], dist: UpstreamHashShard(t4._row_id) } │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t5.i AND t1.a = t5.j, output: [t1.a, t1.b, t5.j, t5._row_id] } │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1900,7 +1900,7 @@ │ │ │ │ │ └─StreamTableScan { table: t5, columns: [t5.i, t5.j, t5._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t5._row_id], pk: [_row_id], dist: UpstreamHashShard(t5._row_id) } │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.b = t6.l, output: [t1.a, t1.b, t6.k, t6._row_id] } │ │ │ │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1910,7 +1910,7 @@ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [sum(t2.d), count] } │ │ │ └─StreamHashJoin { type: LeftOuter, predicate: t1.a IS NOT DISTINCT FROM t1.a AND t1.b IS NOT DISTINCT FROM t1.b, output: [t1.a, t1.b, t2.d, t1.a, t1.b, t3._row_id, t2.c, t4._row_id, t5._row_id, t5.j] } │ │ │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1922,7 +1922,7 @@ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } │ │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1930,7 +1930,7 @@ │ │ │ │ │ │ │ └─StreamTableScan { table: t2, columns: [t2.c, t2.d], stream_scan_type: ArrangementBackfill, stream_key: [t2.c], pk: [c], dist: UpstreamHashShard(t2.c) } │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t3.e AND t1.a = t3.f, output: [t1.a, t1.b, t3.f, t3._row_id] } │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1938,7 +1938,7 @@ │ │ │ │ │ │ └─StreamTableScan { table: t3, columns: [t3.e, t3.f, t3._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t3._row_id], pk: [_row_id], dist: UpstreamHashShard(t3._row_id) } │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t4.g AND t1.a = t4.h, output: [t1.a, t1.b, t4.h, t4._row_id] } │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1946,7 +1946,7 @@ │ │ │ │ │ └─StreamTableScan { table: t4, columns: [t4.g, t4.h, t4._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t4._row_id], pk: [_row_id], dist: UpstreamHashShard(t4._row_id) } │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t5.i AND t1.a = t5.j, output: [t1.a, t1.b, t5.j, t5._row_id] } │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1954,7 +1954,7 @@ │ │ │ │ └─StreamTableScan { table: t5, columns: [t5.i, t5.j, t5._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t5._row_id], pk: [_row_id], dist: UpstreamHashShard(t5._row_id) } │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.b = t6.l, output: [t1.a, t1.b, t6.k, t6._row_id] } │ │ │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1964,7 +1964,7 @@ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [sum(t2.d), count] } │ │ └─StreamHashJoin { type: LeftOuter, predicate: t1.a IS NOT DISTINCT FROM t1.a AND t1.b IS NOT DISTINCT FROM t1.b, output: [t1.a, t1.b, t2.d, t1.a, t1.b, t3._row_id, t2.c, t4._row_id, t5._row_id, t5.j] } │ │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1976,7 +1976,7 @@ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } │ │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1984,7 +1984,7 @@ │ │ │ │ │ │ └─StreamTableScan { table: t2, columns: [t2.c, t2.d], stream_scan_type: ArrangementBackfill, stream_key: [t2.c], pk: [c], dist: UpstreamHashShard(t2.c) } │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t3.e AND t1.a = t3.f, output: [t1.a, t1.b, t3.f, t3._row_id] } │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -1992,7 +1992,7 @@ │ │ │ │ │ └─StreamTableScan { table: t3, columns: [t3.e, t3.f, t3._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t3._row_id], pk: [_row_id], dist: UpstreamHashShard(t3._row_id) } │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t4.g AND t1.a = t4.h, output: [t1.a, t1.b, t4.h, t4._row_id] } │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2000,7 +2000,7 @@ │ │ │ │ └─StreamTableScan { table: t4, columns: [t4.g, t4.h, t4._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t4._row_id], pk: [_row_id], dist: UpstreamHashShard(t4._row_id) } │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t5.i AND t1.a = t5.j, output: [t1.a, t1.b, t5.j, t5._row_id] } │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2008,7 +2008,7 @@ │ │ │ └─StreamTableScan { table: t5, columns: [t5.i, t5.j, t5._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t5._row_id], pk: [_row_id], dist: UpstreamHashShard(t5._row_id) } │ │ └─StreamHashJoin { type: Inner, predicate: t1.b = t6.l, output: [t1.a, t1.b, t6.k, t6._row_id] } │ │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2018,7 +2018,7 @@ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [sum(t2.d), count] } │ └─StreamHashJoin { type: LeftOuter, predicate: t1.a IS NOT DISTINCT FROM t1.a AND t1.b IS NOT DISTINCT FROM t1.b, output: [t1.a, t1.b, t2.d, t1.a, t1.b, t3._row_id, t2.c, t4._row_id, t5._row_id, t5.j] } │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2030,7 +2030,7 @@ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } │ │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2038,7 +2038,7 @@ │ │ │ │ │ └─StreamTableScan { table: t2, columns: [t2.c, t2.d], stream_scan_type: ArrangementBackfill, stream_key: [t2.c], pk: [c], dist: UpstreamHashShard(t2.c) } │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t3.e AND t1.a = t3.f, output: [t1.a, t1.b, t3.f, t3._row_id] } │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2046,7 +2046,7 @@ │ │ │ │ └─StreamTableScan { table: t3, columns: [t3.e, t3.f, t3._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t3._row_id], pk: [_row_id], dist: UpstreamHashShard(t3._row_id) } │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t4.g AND t1.a = t4.h, output: [t1.a, t1.b, t4.h, t4._row_id] } │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2054,7 +2054,7 @@ │ │ │ └─StreamTableScan { table: t4, columns: [t4.g, t4.h, t4._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t4._row_id], pk: [_row_id], dist: UpstreamHashShard(t4._row_id) } │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t5.i AND t1.a = t5.j, output: [t1.a, t1.b, t5.j, t5._row_id] } │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2062,7 +2062,7 @@ │ │ └─StreamTableScan { table: t5, columns: [t5.i, t5.j, t5._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t5._row_id], pk: [_row_id], dist: UpstreamHashShard(t5._row_id) } │ └─StreamHashJoin { type: Inner, predicate: t1.b = t6.l, output: [t1.a, t1.b, t6.k, t6._row_id] } │ ├─StreamExchange { dist: HashShard(t1.b) } - │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2072,7 +2072,7 @@ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [sum(t2.d), count] } └─StreamHashJoin { type: LeftOuter, predicate: t1.a IS NOT DISTINCT FROM t1.a AND t1.b IS NOT DISTINCT FROM t1.b, output: [t1.a, t1.b, t2.d, t1.a, t1.b, t3._row_id, t2.c, t4._row_id, t5._row_id, t5.j] } ├─StreamExchange { dist: HashShard(t1.b) } - │ └─StreamProject { exprs: [t1.a, t1.b] } + │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2084,7 +2084,7 @@ │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } │ │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.c, output: all } │ │ │ │ ├─StreamExchange { dist: HashShard(t1.a) } - │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2092,7 +2092,7 @@ │ │ │ │ └─StreamTableScan { table: t2, columns: [t2.c, t2.d], stream_scan_type: ArrangementBackfill, stream_key: [t2.c], pk: [c], dist: UpstreamHashShard(t2.c) } │ │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t3.e AND t1.a = t3.f, output: [t1.a, t1.b, t3.f, t3._row_id] } │ │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2100,7 +2100,7 @@ │ │ │ └─StreamTableScan { table: t3, columns: [t3.e, t3.f, t3._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t3._row_id], pk: [_row_id], dist: UpstreamHashShard(t3._row_id) } │ │ └─StreamHashJoin { type: Inner, predicate: t1.a = t4.g AND t1.a = t4.h, output: [t1.a, t1.b, t4.h, t4._row_id] } │ │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2108,7 +2108,7 @@ │ │ └─StreamTableScan { table: t4, columns: [t4.g, t4.h, t4._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t4._row_id], pk: [_row_id], dist: UpstreamHashShard(t4._row_id) } │ └─StreamHashJoin { type: Inner, predicate: t1.a = t5.i AND t1.a = t5.j, output: [t1.a, t1.b, t5.j, t5._row_id] } │ ├─StreamExchange { dist: HashShard(t1.a, t1.a) } - │ │ └─StreamProject { exprs: [t1.a, t1.b] } + │ │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } @@ -2116,7 +2116,7 @@ │ └─StreamTableScan { table: t5, columns: [t5.i, t5.j, t5._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t5._row_id], pk: [_row_id], dist: UpstreamHashShard(t5._row_id) } └─StreamHashJoin { type: Inner, predicate: t1.b = t6.l, output: [t1.a, t1.b, t6.k, t6._row_id] } ├─StreamExchange { dist: HashShard(t1.b) } - │ └─StreamProject { exprs: [t1.a, t1.b] } + │ └─StreamProject { exprs: [t1.a, t1.b], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } │ └─StreamExchange { dist: HashShard(t1.a, t1.b) } │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/union.yaml b/src/frontend/planner_test/tests/testdata/output/union.yaml index c9591f2dee2d..ffd31dec73da 100644 --- a/src/frontend/planner_test/tests/testdata/output/union.yaml +++ b/src/frontend/planner_test/tests/testdata/output/union.yaml @@ -78,7 +78,7 @@ └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } - └─StreamProject { exprs: [t1.a, t1.b, t1.c] } + └─StreamProject { exprs: [t1.a, t1.b, t1.c], noop_update_hint: true } └─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } └─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } └─StreamUnion { all: true } @@ -92,7 +92,7 @@ Fragment 0 StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [t1.a, t1.b, t1.c] } + └── StreamProject { exprs: [t1.a, t1.b, t1.c], noop_update_hint: true } └── StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } { tables: [ HashAggState: 0 ] } └── StreamExchange Hash([0, 1, 2]) from 1 @@ -155,7 +155,7 @@ └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: UpstreamHashShard(t2.a) } stream_plan: |- StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } - └─StreamProject { exprs: [t1.a, t1.b, t1.c] } + └─StreamProject { exprs: [t1.a, t1.b, t1.c], noop_update_hint: true } └─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } └─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } └─StreamUnion { all: true } @@ -169,7 +169,7 @@ Fragment 0 StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } ├── tables: [ Materialize: 4294967294 ] - └── StreamProject { exprs: [t1.a, t1.b, t1.c] } + └── StreamProject { exprs: [t1.a, t1.b, t1.c], noop_update_hint: true } └── StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } { tables: [ HashAggState: 0 ] } └── StreamExchange Hash([0, 1, 2]) from 1 diff --git a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml index be7c3a19a566..1b811a487199 100644 --- a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml +++ b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml @@ -30,7 +30,7 @@ ├─StreamExchange { dist: HashShard(t.arr) } │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamProjectSet { select_list: [$0, Unnest($0)] } - └─StreamProject { exprs: [t.arr] } + └─StreamProject { exprs: [t.arr], noop_update_hint: true } └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -54,7 +54,7 @@ ├─StreamExchange { dist: HashShard(t.arr) } │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamProjectSet { select_list: [$0, Unnest($0)] } - └─StreamProject { exprs: [t.arr] } + └─StreamProject { exprs: [t.arr], noop_update_hint: true } └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -78,7 +78,7 @@ ├─StreamExchange { dist: HashShard(t.arr) } │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamProjectSet { select_list: [$0, Unnest($0)] } - └─StreamProject { exprs: [t.arr] } + └─StreamProject { exprs: [t.arr], noop_update_hint: true } └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -102,7 +102,7 @@ ├─StreamExchange { dist: HashShard(t.arr) } │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamProjectSet { select_list: [$0, Unnest($0)] } - └─StreamProject { exprs: [t.arr] } + └─StreamProject { exprs: [t.arr], noop_update_hint: true } └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -127,7 +127,7 @@ ├─StreamExchange { dist: HashShard(t.arr) } │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamProjectSet { select_list: [$0, Unnest($0)] } - └─StreamProject { exprs: [t.arr] } + └─StreamProject { exprs: [t.arr], noop_update_hint: true } └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -161,12 +161,12 @@ │ ├─StreamExchange { dist: HashShard(t.arr) } │ │ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } │ └─StreamProjectSet { select_list: [$0, Unnest($0)] } - │ └─StreamProject { exprs: [t.arr] } + │ └─StreamProject { exprs: [t.arr], noop_update_hint: true } │ └─StreamHashAgg { group_key: [t.arr], aggs: [count] } │ └─StreamExchange { dist: HashShard(t.arr) } │ └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } └─StreamProjectSet { select_list: [$0, Unnest($0)] } - └─StreamProject { exprs: [t.arr] } + └─StreamProject { exprs: [t.arr], noop_update_hint: true } └─StreamHashAgg { group_key: [t.arr], aggs: [count] } └─StreamExchange { dist: HashShard(t.arr) } └─StreamTableScan { table: t, columns: [t.arr, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 6d4909802790..f32b37d250b5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -1140,10 +1140,16 @@ impl ToStream for LogicalAgg { Ok(plan) } else { // a `count(*)` is appended, should project the output + assert_eq!(self.agg_calls().len() + 1, n_final_agg_calls); Ok(StreamProject::new(generic::Project::with_out_col_idx( plan, 0..self.schema().len(), )) + // If there's no agg call, then `count(*)` will be the only column in the output besides keys. + // Since it'll be pruned immediately in `StreamProject`, the update records are likely to be + // no-op. So we set the hint to instruct the executor to eliminate them. + // See https://github.com/risingwavelabs/risingwave/issues/17030. + .with_noop_update_hint(self.agg_calls().is_empty()) .into()) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index c077af9241aa..e8ff1df6e82d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -38,10 +38,15 @@ pub struct StreamProject { watermark_derivations: Vec<(usize, usize)>, /// Nondecreasing expression indices. `Project` can produce watermarks for these expressions. nondecreasing_exprs: Vec, + /// Whether there are likely no-op updates in the output chunks, so that eliminating them with + /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial. + noop_update_hint: bool, } impl Distill for StreamProject { fn distill<'a>(&self) -> XmlNode<'a> { + let verbose = self.base.ctx().is_explain_verbose(); + let schema = self.schema(); let mut vec = self.core.fields_pretty(schema); if let Some(display_output_watermarks) = @@ -49,12 +54,27 @@ impl Distill for StreamProject { { vec.push(("output_watermarks", display_output_watermarks)); } + if verbose && self.noop_update_hint { + vec.push(("noop_update_hint", "true".into())); + } childless_record("StreamProject", vec) } } impl StreamProject { pub fn new(core: generic::Project) -> Self { + Self::new_inner(core, false) + } + + /// Set the `noop_update_hint` flag to the given value. + pub fn with_noop_update_hint(self, noop_update_hint: bool) -> Self { + Self { + noop_update_hint, + ..self + } + } + + fn new_inner(core: generic::Project, noop_update_hint: bool) -> Self { let input = core.input.clone(); let distribution = core .i2o_col_mapping() @@ -90,11 +110,13 @@ impl StreamProject { input.emit_on_window_close(), watermark_columns, ); + StreamProject { base, core, watermark_derivations, nondecreasing_exprs, + noop_update_hint, } } @@ -105,6 +127,10 @@ impl StreamProject { pub fn exprs(&self) -> &Vec { &self.core.exprs } + + pub fn noop_update_hint(&self) -> bool { + self.noop_update_hint + } } impl PlanTreeNodeUnary for StreamProject { @@ -115,7 +141,7 @@ impl PlanTreeNodeUnary for StreamProject { fn clone_with_input(&self, input: PlanRef) -> Self { let mut core = self.core.clone(); core.input = input; - Self::new(core) + Self::new_inner(core, self.noop_update_hint) } } impl_plan_tree_node_for_unary! {StreamProject} @@ -132,6 +158,7 @@ impl StreamNode for StreamProject { watermark_input_cols, watermark_output_cols, nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(), + noop_update_hint: self.noop_update_hint, }) } } @@ -144,7 +171,7 @@ impl ExprRewritable for StreamProject { fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { let mut core = self.core.clone(); core.rewrite_exprs(r); - Self::new(core).into() + Self::new_inner(core, self.noop_update_hint).into() } } diff --git a/src/frontend/src/optimizer/rule/stream/stream_project_merge_rule.rs b/src/frontend/src/optimizer/rule/stream/stream_project_merge_rule.rs index 0eba52e193d2..91ab942e3a7f 100644 --- a/src/frontend/src/optimizer/rule/stream/stream_project_merge_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/stream_project_merge_rule.rs @@ -47,7 +47,15 @@ impl Rule for StreamProjectMergeRule { .map(|expr| subst.rewrite_expr(expr)) .collect(); let logical_project = generic::Project::new(exprs, inner_project.input()); - Some(StreamProject::new(logical_project).into()) + + // If either of the projects has the hint, we should keep it. + let noop_update_hint = outer_project.noop_update_hint() || inner_project.noop_update_hint(); + + Some( + StreamProject::new(logical_project) + .with_noop_update_hint(noop_update_hint) + .into(), + ) } } diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 838527b32382..f2d9a4324172 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -345,9 +345,7 @@ fn make_stream_fragments() -> Vec { make_inputref(0), make_inputref(1), ], - watermark_input_cols: vec![], - watermark_output_cols: vec![], - nondecreasing_exprs: vec![], + ..Default::default() })), fields: vec![], // TODO: fill this later input: vec![simple_agg_node_1], diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index e341adad9df5..8f73fe26ee7d 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -178,6 +178,7 @@ async fn test_merger_sum_aggr() { MultiMap::new(), vec![], 0.0, + false, ); let items = Arc::new(Mutex::new(vec![])); diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 22ce33a1066d..6ea579afea52 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -44,6 +44,10 @@ struct Inner { /// the selectivity threshold which should be in `[0,1]`. for the chunk with selectivity less /// than the threshold, the Project executor will construct a new chunk before expr evaluation, materialize_selectivity_threshold: f64, + + /// Whether there are likely no-op updates in the output chunks, so that eliminating them with + /// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial. + noop_update_hint: bool, } impl ProjectExecutor { @@ -55,6 +59,7 @@ impl ProjectExecutor { watermark_derivations: MultiMap, nondecreasing_expr_indices: Vec, materialize_selectivity_threshold: f64, + noop_update_hint: bool, ) -> Self { let n_nondecreasing_exprs = nondecreasing_expr_indices.len(); Self { @@ -66,6 +71,7 @@ impl ProjectExecutor { nondecreasing_expr_indices, last_nondec_expr_values: vec![None; n_nondecreasing_exprs], materialize_selectivity_threshold, + noop_update_hint, }, } } @@ -103,7 +109,11 @@ impl Inner { projected_columns.push(evaluated_expr); } let (_, vis) = data_chunk.into_parts(); - let new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis); + + let mut new_chunk = StreamChunk::with_visibility(ops, projected_columns, vis); + if self.noop_update_hint { + new_chunk = new_chunk.eliminate_adjacent_noop_update(); + } Ok(Some(new_chunk)) } @@ -233,6 +243,7 @@ mod tests { MultiMap::new(), vec![], 0.0, + false, ); let mut project = project.boxed().execute(); @@ -315,6 +326,7 @@ mod tests { MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()), vec![2], 0.0, + false, ); let mut project = project.boxed().execute(); diff --git a/src/stream/src/from_proto/project.rs b/src/stream/src/from_proto/project.rs index d7f96c4dffcb..177045b3eba7 100644 --- a/src/stream/src/from_proto/project.rs +++ b/src/stream/src/from_proto/project.rs @@ -67,6 +67,7 @@ impl ExecutorBuilder for ProjectExecutorBuilder { watermark_derivations, nondecreasing_expr_indices, materialize_selectivity_threshold, + node.noop_update_hint, ); Ok((params.info, exec).into()) }