From 3a40c09a543f38a3624587f5ad9934ebfd622046 Mon Sep 17 00:00:00 2001 From: Yiyu Tian Date: Tue, 17 Sep 2024 12:56:00 -0400 Subject: [PATCH] [FLINK-12173][table] Optimize SELECT DISTINCT --- .../plan/rules/FlinkStreamRuleSets.scala | 4 +- ...eamLogicalOptimizeSelectDistinctRule.scala | 140 ++++++++++++++++++ .../MatchRecognizeValidationTest.java | 2 +- .../planner/plan/common/PartialInsertTest.xml | 32 ++-- .../plan/hints/stream/StateTtlHintTest.xml | 12 +- .../plan/stream/sql/DeduplicateTest.xml | 2 +- .../stream/sql/NonDeterministicDagTest.xml | 2 +- .../planner/plan/stream/sql/RankTest.xml | 23 +++ .../plan/stream/sql/SetOperatorsTest.xml | 10 +- .../plan/stream/sql/SubplanReuseTest.xml | 2 +- .../plan/stream/sql/agg/GroupingSetsTest.xml | 2 +- .../stream/sql/agg/WindowAggregateTest.xml | 31 ++-- .../plan/stream/sql/join/IntervalJoinTest.xml | 11 +- .../planner/plan/stream/sql/join/JoinTest.xml | 68 ++++----- .../plan/stream/sql/join/SemiAntiJoinTest.xml | 24 +-- .../plan/stream/sql/join/WindowJoinTest.xml | 72 +++++---- .../validation/AggregateValidationTest.xml | 5 +- .../plan/stream/sql/DeduplicateTest.scala | 1 - .../planner/plan/stream/sql/RankTest.scala | 11 ++ 19 files changed, 322 insertions(+), 132 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamLogicalOptimizeSelectDistinctRule.scala diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index e3abb9b1f04343..374d609e4eb52a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -416,7 +416,9 @@ object FlinkStreamRuleSets { // Avoid async calls which call async calls. AsyncCalcSplitRule.NESTED_SPLIT, // Avoid having async calls in multiple projections in a single calc. - AsyncCalcSplitRule.ONE_PER_CALC_SPLIT + AsyncCalcSplitRule.ONE_PER_CALC_SPLIT, + // Optimize SELECT DISTINCT to use FlinkLogicalRank + StreamLogicalOptimizeSelectDistinctRule.INSTANCE ) /** RuleSet to do physical optimize for stream */ diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamLogicalOptimizeSelectDistinctRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamLogicalOptimizeSelectDistinctRule.scala new file mode 100644 index 00000000000000..6ae7238837ef94 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamLogicalOptimizeSelectDistinctRule.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.rules.physical.stream +import org.apache.flink.table.planner.JList +import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.plan.nodes.FlinkConventions +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalAggregate, FlinkLogicalCalc, FlinkLogicalJoin, FlinkLogicalRank} +import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalIntervalJoin, StreamPhysicalRank, StreamPhysicalTemporalSort} +import org.apache.flink.table.planner.plan.utils.{RankProcessStrategy, RankUtil, WindowJoinUtil} +import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType} +import org.apache.flink.table.types.logical.IntType + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl, RelDataTypeSystem} +import org.apache.calcite.rel.{RelCollation, RelCollations, RelFieldCollation} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule.Config +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.hint.RelHint +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex.{RexInputRef, RexNode, RexProgram} +import org.apache.calcite.util.ImmutableBitSet + +import java.util +import java.util.Collections + +import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` + +/** + * Rule that matches [[FlinkLogicalAggregate]], and converts it to [[FlinkLogicalRank]] in the case + * of SELECT DISTINCT queries. + * + * e.g. {SELECT DISTINCT a, b, c;} will be converted to [[FlinkLogicalRank]] instead of + * [[FlinkLogicalAggregate]] in rowtime. + */ +class StreamLogicalOptimizeSelectDistinctRule + extends RelOptRule(operand(classOf[FlinkLogicalAggregate], any)) { + private val classLoader = Thread.currentThread().getContextClassLoader + private val typeSystem = RelDataTypeSystem.DEFAULT + private val typeFactory = new FlinkTypeFactory(classLoader, typeSystem) + private val intType: RelDataType = + typeFactory.createFieldTypeFromLogicalType(new IntType(false)) + + override def matches(call: RelOptRuleCall): Boolean = { + val rel: FlinkLogicalAggregate = call.rel(0) + // check if it's a SELECT DISTINCT query + rel.getGroupSet.cardinality() == rel.getRowType.getFieldCount && rel.getAggCallList.isEmpty + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val agg: FlinkLogicalAggregate = call.rel(0) + + val input = agg.getInput + val traitSet = agg.getTraitSet + val cluster = agg.getCluster + + // Create a list of all group keys + val groupKeys = agg.getGroupSet + + // Create a projection to select only the fields in the DISTINCT clause + val projectList: JList[RexNode] = { + val list = new util.ArrayList[RexNode](groupKeys.cardinality()) + groupKeys.toList.foreach(i => list.add(RexInputRef.of(i, input.getRowType))) + list + } + + // Create the projected row type + val projectedFields: JList[RelDataTypeField] = { + val fields = new util.ArrayList[RelDataTypeField](projectList.size()) + projectList.forEach { + rexNode => + val rexInputRef = rexNode.asInstanceOf[RexInputRef] + fields.add(input.getRowType.getFieldList.get(rexInputRef.getIndex)) + } + fields + } + + // Create the projected row type + val projectedRowType: RelDataType = typeFactory.createStructType(projectedFields) + + // Create a RelCollation based on all group keys + val fieldCollations: JList[RelFieldCollation] = { + val collations = new util.ArrayList[RelFieldCollation](groupKeys.cardinality()) + groupKeys.foreach { + key => collations.add(new RelFieldCollation(key, RelFieldCollation.Direction.ASCENDING)) + } + collations + } + val collation: RelCollation = RelCollations.of(fieldCollations) + + // Create a projection to select only the fields in the DISTINCT clause + val projection: RelNode = FlinkLogicalCalc.create( + input, + RexProgram.create( + input.getRowType, + projectList, + null, + projectedRowType, + cluster.getRexBuilder + )) + + val rank = new FlinkLogicalRank( + cluster, + traitSet, + projection, + groupKeys, + collation, + RankType.ROW_NUMBER, + new ConstantRankRange(1, 1), // We only want the first row for each group + new RelDataTypeFieldImpl("rk", projectedRowType.getFieldCount - 1, intType), + false + ) + try RankUtil.canConvertToDeduplicate(rank) + catch { + case _: Exception => + return + } + call.transformTo(rank) + } +} + +object StreamLogicalOptimizeSelectDistinctRule { + val INSTANCE = new StreamLogicalOptimizeSelectDistinctRule() +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java index c0ca9fbf975c7d..f217858c3e4c69 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java @@ -188,7 +188,7 @@ void testUpdatesInUpstreamOperatorNotSupported() { assertThatExceptionOfType(TableException.class) .isThrownBy(() -> tEnv.executeSql(sqlQuery)) .withMessageContaining( - "Match Recognize doesn't support consuming update changes which is produced by node GroupAggregate("); + "Match Recognize doesn't support consuming update and delete changes which is produced by node Rank("); } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml index 2064ed8df954d2..8bc7a0cae4158b 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml @@ -27,14 +27,14 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], targetCol == Optimized Physical Plan == Sink(table=[default_catalog.default_database.partitioned_sink], targetColumns=[[4],[0],[3]], fields=[a, EXPR$1, d, e, EXPR$4, EXPR$5]) +- Calc(select=[a, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS EXPR$1, d, e, null:BIGINT AS EXPR$4, null:INTEGER AS EXPR$5]) - +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) +- Exchange(distribution=[hash[a, b, c, d, e]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) == Optimized Execution Plan == Sink(table=[default_catalog.default_database.partitioned_sink], targetColumns=[[4],[0],[3]], fields=[a, EXPR$1, d, e, EXPR$4, EXPR$5]) +- Calc(select=[a, null:VARCHAR(2147483647) AS EXPR$1, d, e, null:BIGINT AS EXPR$4, null:INTEGER AS EXPR$5]) - +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) +- Exchange(distribution=[hash[a, b, c, d, e]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) ]]> @@ -119,7 +119,7 @@ LogicalSink(table=[default_catalog.default_database.sink], targetColumns=[[1],[4 @@ -164,7 +164,7 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], targetCol @@ -221,11 +221,11 @@ Sink(table=[default_catalog.default_database.partitioned_sink], targetColumns=[[ +- Exchange(distribution=[hash[a, c, d, e, f, g]]) +- Union(all=[true], union=[a, c, d, e, f, g, vcol_marker]) :- Calc(select=[a, c, d, e, CAST(123 AS BIGINT) AS f, CAST(456 AS INTEGER) AS g, 1 AS vcol_marker]) - : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) : +- Exchange(distribution=[hash[a, b, c, d, e]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +- Calc(select=[a, c, d, e, CAST(123 AS BIGINT) AS f, CAST(456 AS INTEGER) AS g, -1 AS vcol_marker]) - +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) +- Exchange(distribution=[hash[a, b, c, d, e]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) ]]> @@ -299,11 +299,11 @@ Sink(table=[default_catalog.default_database.partitioned_sink], targetColumns=[[ +- Exchange(distribution=[hash[a, c, d, e, f, g]]) +- Union(all=[true], union=[a, c, d, e, f, g, vcol_left_marker, vcol_right_marker]) :- Calc(select=[a, c, d, e, CAST(123 AS BIGINT) AS f, CAST(456 AS INTEGER) AS g, true AS vcol_left_marker, null:BOOLEAN AS vcol_right_marker]) - : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) : +- Exchange(distribution=[hash[a, b, c, d, e]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +- Calc(select=[a, c, d, e, CAST(456 AS BIGINT) AS f, CAST(789 AS INTEGER) AS g, null:BOOLEAN AS vcol_left_marker, true AS vcol_right_marker]) - +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) +- Exchange(distribution=[hash[a, b, c, d, e]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) ]]> @@ -455,15 +455,15 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], targetCol @@ -491,11 +491,11 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], targetCol Sink(table=[default_catalog.default_database.partitioned_sink], targetColumns=[[4],[0],[6],[5],[2],[3]], fields=[a, c, d, e, f, g]) +- Union(all=[true], union=[a, c, d, e, f, g]) :- Calc(select=[a, c, d, e, CAST(123 AS BIGINT) AS f, CAST(456 AS INTEGER) AS g]) - : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) : +- Exchange(distribution=[hash[a, b, c, d, e]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +- Calc(select=[a, c, d, e, CAST(456 AS BIGINT) AS f, CAST(789 AS INTEGER) AS g]) - +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) +- Exchange(distribution=[hash[a, b, c, d, e]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) ]]> @@ -612,15 +612,15 @@ Sink(table=[default_catalog.default_database.partitioned_sink], targetColumns=[[ +- Union(all=[true], union=[a, c, d, e, f, g]) :- Union(all=[true], union=[a, c, d, e, f, g]) : :- Calc(select=[a, c, d, e, CAST(123 AS BIGINT) AS f, CAST(456 AS INTEGER) AS g]) - : : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + : : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) : : +- Exchange(distribution=[hash[a, b, c, d, e]]) : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) : +- Calc(select=[a, c, d, e, CAST(456 AS BIGINT) AS f, CAST(789 AS INTEGER) AS g]) - : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) : +- Exchange(distribution=[hash[a, b, c, d, e]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +- Calc(select=[a, c, d, e, CAST(456 AS BIGINT) AS f, CAST(123 AS INTEGER) AS g]) - +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b, c, d, e], orderBy=[a ASC, b ASC, c ASC, d ASC, e ASC], select=[a, b, c, d, e]) +- Exchange(distribution=[hash[a, b, c, d, e]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml index 8b1a70096f7e6d..6eebf11f470806 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml @@ -34,7 +34,7 @@ LogicalProject(EXPR$0=[$1]) Calc(select=[EXPR$0]) +- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0], stateTtlHints=[[[STATE_TTL options:[2d]]]]) +- Exchange(distribution=[hash[a1]]) - +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a1, b1], orderBy=[a1 ASC, b1 ASC], select=[a1, b1]) +- Exchange(distribution=[hash[a1, b1]]) +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) ]]> @@ -58,7 +58,7 @@ LogicalProject(EXPR$0=[$1]) Calc(select=[EXPR$0]) +- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0]) +- Exchange(distribution=[hash[a1]]) - +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1], stateTtlHints=[[[STATE_TTL options:[4d]]]]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a1, b1], orderBy=[a1 ASC, b1 ASC], select=[a1, b1]) +- Exchange(distribution=[hash[a1, b1]]) +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) ]]> @@ -82,7 +82,7 @@ LogicalProject(EXPR$0=[$1]) Calc(select=[EXPR$0]) +- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0], stateTtlHints=[[[STATE_TTL options:[2d]]]]) +- Exchange(distribution=[hash[a1]]) - +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1], stateTtlHints=[[[STATE_TTL options:[4d]]]]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a1, b1], orderBy=[a1 ASC, b1 ASC], select=[a1, b1]) +- Exchange(distribution=[hash[a1, b1]]) +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) ]]> @@ -162,7 +162,7 @@ LogicalProject(EXPR$0=[$1]) Calc(select=[EXPR$0]) +- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0], stateTtlHints=[[[STATE_TTL options:[2d]]]]) +- Exchange(distribution=[hash[a1]]) - +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a1, b1], orderBy=[a1 ASC, b1 ASC], select=[a1, b1]) +- Exchange(distribution=[hash[a1, b1]]) +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1]) ]]> @@ -524,7 +524,7 @@ Calc(select=[a, b, c]) : +- WatermarkAssigner(rowtime=[c], watermark=[c]) : +- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark1]], fields=[a, b, c]) +- Exchange(distribution=[hash[b]]) - +- GroupAggregate(groupBy=[b], select=[b]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[b], orderBy=[b ASC], select=[b]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b], where=[IS NOT NULL(b)]) +- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL SECOND)]) @@ -563,7 +563,7 @@ Calc(select=[a, b, c]) : +- WatermarkAssigner(rowtime=[c], watermark=[c]) : +- TableSourceScan(table=[[default_catalog, default_database, tableWithWatermark1]], fields=[a, b, c]) +- Exchange(distribution=[hash[b]]) - +- GroupAggregate(groupBy=[b], select=[b]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[b], orderBy=[b ASC], select=[b]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b], where=[IS NOT NULL(b)]) +- WatermarkAssigner(rowtime=[d], watermark=[-(d, 5000:INTERVAL SECOND)]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml index c23a3820a60a09..72792ac39fad08 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml @@ -390,6 +390,6 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 1 AS +- Exchange(distribution=[hash[a]]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> - + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml index a0a00b51d832bd..76bf598e523f54 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml @@ -3330,7 +3330,7 @@ LogicalSink(table=[default_catalog.default_database.sink_with_composite_pk], fie + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml index 61552b112b2727..636f7c4448dff3 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml @@ -31,7 +31,7 @@ LogicalIntersect(all=[false]) @@ -3875,10 +3877,12 @@ Calc(select=[a, window_start, window_end, EXPR$3]) +- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, window_start, window_end]) - +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end]) + +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], offset=[8 h])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b], orderBy=[a ASC, b ASC, window_start ASC, window_end ASC], select=[a, b, window_start, window_end]) +- Exchange(distribution=[hash[a, b]]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime]) + +- Calc(select=[a, b, window_start, window_end]) + +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime]) ]]> @@ -3924,11 +3928,12 @@ Calc(select=[a, window_start, window_end, EXPR$3]) +- Exchange(distribution=[hash[a]]) +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[a, window_start, window_end]) - +- GlobalWindowAggregate(groupBy=[a, b], window=[TUMBLE(slice_end=[$slice_end], size=[1 d], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end]) + +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], offset=[8 h])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b], orderBy=[a ASC, b ASC, window_start ASC, window_end ASC], select=[a, b, window_start, window_end]) +- Exchange(distribution=[hash[a, b]]) - +- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])], select=[a, b, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime]) + +- Calc(select=[a, b, window_start, window_end]) + +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml index 6f8cbd5b0c6b40..d0ee6d477b1e95 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml @@ -46,7 +46,7 @@ Calc(select=[a]) : +- Calc(select=[a, b, CAST(rowtime AS TIMESTAMP(3)) AS rowtime]) : +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])(reuse_id=[1]) +- Exchange(distribution=[hash[b0, rowtime0, a]]) - +- GroupAggregate(groupBy=[a, b0, rowtime0], select=[a, b0, rowtime0]) + +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a, b0, rowtime0], orderBy=[a ASC, b0 ASC, rowtime0 ASC], select=[a, b0, rowtime0]) +- Exchange(distribution=[hash[a, b0, rowtime0]]) +- Calc(select=[a, b0, rowtime0]) +- Join(joinType=[InnerJoin], where=[((b0 = b) AND (rowtime0 >= rowtime) AND (rowtime0 <= (rowtime + 300000:INTERVAL MINUTE)))], select=[a, b, rowtime, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey]) @@ -54,10 +54,11 @@ Calc(select=[a]) : +- Calc(select=[a, b, CAST(rowtime AS TIMESTAMP(3)) AS rowtime]) : +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime]) +- Exchange(distribution=[hash[b]]) - +- GroupAggregate(groupBy=[b, rowtime], select=[b, rowtime]) - +- Exchange(distribution=[hash[b, rowtime]]) - +- Calc(select=[b, CAST(rowtime AS TIMESTAMP(3)) AS rowtime]) - +- Reused(reference_id=[1]) + +- Calc(select=[b, CAST(rowtime AS TIMESTAMP(3)) AS rowtime]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[b, rowtime], orderBy=[ROWTIME b ASC, rowtime ASC], select=[b, rowtime]) + +- Exchange(distribution=[hash[b, rowtime]]) + +- Calc(select=[b, rowtime]) + +- Reused(reference_id=[1]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml index 9477bd2122452e..4498843a3c94b9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml @@ -160,15 +160,15 @@ LogicalProject(a1=[$1], b1=[$3]) @@ -228,10 +228,10 @@ LogicalProject(a1=[$1], b1=[$2]) @@ -505,15 +505,15 @@ LogicalProject(a1=[$1], b1=[$3]) @@ -1096,10 +1096,10 @@ LogicalProject(a1=[$1], b1=[$2]) @@ -1528,10 +1528,10 @@ LogicalProject(a1=[$1], b1=[$2]) 10)]) @@ -531,7 +531,7 @@ Join(joinType=[LeftAntiJoin], where=[(b <> e)], select=[a, b, c], leftInputSpec= : : +- GroupAggregate(select=[MIN(i) AS m]) : : +- Exchange(distribution=[single]) : : +- Calc(select=[true AS i]) -: : +- GroupAggregate(groupBy=[l], select=[l]) +: : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[l], orderBy=[l ASC], select=[l]) : : +- Exchange(distribution=[hash[l]]) : : +- Calc(select=[l], where=[LIKE(n, 'Test')]) : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(l, m, n)]]], fields=[l, m, n]) @@ -695,7 +695,7 @@ Calc(select=[b]) : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])(reuse_id=[1]) : +- Exchange(distribution=[single]) : +- Calc(select=[true AS i]) - : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[EXPR$0], orderBy=[EXPR$0 ASC], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- Calc(select=[1 AS EXPR$0]) : +- Reused(reference_id=[1]) @@ -996,7 +996,7 @@ Calc(select=[a]) : : +- Calc(select=[d, e]) : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) : +- Exchange(distribution=[hash[j]]) - : +- GroupAggregate(groupBy=[j], select=[j]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[j], orderBy=[j ASC], select=[j]) : +- Exchange(distribution=[hash[j]]) : +- Calc(select=[j]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])(reuse_id=[1]) @@ -1237,7 +1237,7 @@ Calc(select=[b]) : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])(reuse_id=[1]) : +- Exchange(distribution=[single]) : +- Calc(select=[true AS i]) - : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[EXPR$0], orderBy=[EXPR$0 ASC], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- Calc(select=[1 AS EXPR$0]) : +- Reused(reference_id=[1]) @@ -1603,7 +1603,7 @@ Calc(select=[a]) : +- Calc(select=[a, b]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[e]]) - +- GroupAggregate(groupBy=[e], select=[e]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[e], orderBy=[e ASC], select=[e]) +- Exchange(distribution=[hash[e]]) +- Union(all=[true], union=[e]) :- Calc(select=[e], where=[(d > 10)]) @@ -2291,7 +2291,7 @@ Calc(select=[b]) : : : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j, k])(reuse_id=[2]) : : : +- Exchange(distribution=[hash[i]]) : : : +- Calc(select=[i, true AS i0]) - : : : +- GroupAggregate(groupBy=[i], select=[i]) + : : : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[i], orderBy=[i ASC], select=[i]) : : : +- Exchange(distribution=[hash[i]]) : : : +- Reused(reference_id=[1]) : : +- Exchange(distribution=[single]) @@ -2301,7 +2301,7 @@ Calc(select=[b]) : : +- Reused(reference_id=[2]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- Calc(select=[EXPR$0, true AS i]) - : +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[EXPR$0], orderBy=[EXPR$0 ASC], select=[EXPR$0]) : +- Exchange(distribution=[hash[EXPR$0]]) : +- Reused(reference_id=[3]) +- Exchange(distribution=[hash[f]]) @@ -2535,7 +2535,7 @@ Calc(select=[b]) : : : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, t1, source: [TestTableSource(i)]]], fields=[i])(reuse_id=[1]) : : : +- Exchange(distribution=[hash[i]]) : : : +- Calc(select=[i, true AS i0]) - : : : +- GroupAggregate(groupBy=[i], select=[i]) + : : : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[i], orderBy=[i ASC], select=[i]) : : : +- Exchange(distribution=[hash[i]]) : : : +- Reused(reference_id=[1]) : : +- Exchange(distribution=[single]) @@ -2544,7 +2544,7 @@ Calc(select=[b]) : : +- LegacyTableSourceScan(table=[[default_catalog, default_database, t2, source: [TestTableSource(j)]]], fields=[j])(reuse_id=[2]) : +- Exchange(distribution=[hash[j]]) : +- Calc(select=[j, true AS i]) - : +- GroupAggregate(groupBy=[j], select=[j]) + : +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[j], orderBy=[j ASC], select=[j]) : +- Exchange(distribution=[hash[j]]) : +- Reused(reference_id=[2]) +- Exchange(distribution=[single]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml index a0a733641c3740..ada5352d255a44 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml @@ -78,11 +78,12 @@ WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], si : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, window_start, window_end]) - +- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[a ASC, window_start ASC, window_end ASC, window_time ASC], select=[a, window_start, window_end, window_time]) +- Exchange(distribution=[hash[a]]) - +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) + +- Calc(select=[a, window_start, window_end, window_time]) + +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) ]]> @@ -149,11 +150,12 @@ WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], si : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) +- Exchange(distribution=[single]) +- Calc(select=[a, window_start, window_end]) - +- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[a ASC, window_start ASC, window_end ASC, window_time ASC], select=[a, window_start, window_end, window_time]) +- Exchange(distribution=[hash[a]]) - +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) + +- Calc(select=[a, window_start, window_end, window_time]) + +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) ]]> @@ -201,33 +203,37 @@ Sink(table=[default_catalog.default_database.sink1], fields=[window_start, windo +- Calc(select=[window_start, window_end, user_id, DATE_FORMAT(+(window_end, 25200000:INTERVAL HOUR), 'yyyyMMdd') AS dt, DATE_FORMAT(+(window_end, 25200000:INTERVAL HOUR), 'HH') AS hour]) +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], joinType=[LeftOuterJoin], where=[=(user_id, user_id0)], select=[user_id, window_start, window_end, user_id0, window_start0, window_end0]) :- Exchange(distribution=[hash[user_id]]) - : +- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end]) + : +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[user_id], orderBy=[user_id ASC, window_start ASC, window_end ASC], select=[user_id, window_start, window_end]) : +- Exchange(distribution=[hash[user_id]]) - : +- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end]) - : +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) - : +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time]) + : +- Calc(select=[user_id, window_start, window_end]) + : +- WindowTableFunction(window=[TUMBLE(time_col=[event_time], size=[1 min])]) + : +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) + : +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time]) +- Exchange(distribution=[hash[user_id]]) - +- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end]) + +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[user_id], orderBy=[user_id ASC, window_start ASC, window_end ASC], select=[user_id, window_start, window_end]) +- Exchange(distribution=[hash[user_id]]) - +- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) - +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time]) + +- Calc(select=[user_id, window_start, window_end]) + +- WindowTableFunction(window=[TUMBLE(time_col=[event_time], size=[1 min])]) + +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) + +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time]) Sink(table=[default_catalog.default_database.sink2], fields=[window_start, window_end, user_id, dt, hour]) +- Calc(select=[window_start, window_end, user_id, DATE_FORMAT(+(window_end, 25200000:INTERVAL HOUR), 'yyyyMMdd') AS dt, DATE_FORMAT(+(window_end, 25200000:INTERVAL HOUR), 'HH') AS hour]) +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], joinType=[LeftOuterJoin], where=[=(user_id, user_id0)], select=[user_id, window_start, window_end, user_id0, window_start0, window_end0]) :- Exchange(distribution=[hash[user_id]]) - : +- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end]) + : +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[user_id], orderBy=[user_id ASC, window_start ASC, window_end ASC], select=[user_id, window_start, window_end]) : +- Exchange(distribution=[hash[user_id]]) - : +- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end]) - : +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) - : +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time]) + : +- Calc(select=[user_id, window_start, window_end]) + : +- WindowTableFunction(window=[TUMBLE(time_col=[event_time], size=[1 min])]) + : +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) + : +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time]) +- Exchange(distribution=[hash[user_id]]) - +- GlobalWindowAggregate(groupBy=[user_id], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[user_id, start('w$) AS window_start, end('w$) AS window_end]) + +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[user_id], orderBy=[user_id ASC, window_start ASC, window_end ASC], select=[user_id, window_start, window_end]) +- Exchange(distribution=[hash[user_id]]) - +- LocalWindowAggregate(groupBy=[user_id], window=[TUMBLE(time_col=[event_time], size=[1 min])], select=[user_id, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) - +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time]) + +- Calc(select=[user_id, window_start, window_end]) + +- WindowTableFunction(window=[TUMBLE(time_col=[event_time], size=[1 min])]) + +- WatermarkAssigner(rowtime=[event_time], watermark=[event_time]) + +- TableSourceScan(table=[[default_catalog, default_database, food_order, project=[user_id, event_time], metadata=[]]], fields=[user_id, event_time]) ]]> @@ -1440,11 +1446,12 @@ WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], si : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, window_start, window_end]) - +- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[a ASC, window_start ASC, window_end ASC, window_time ASC], select=[a, window_start, window_end, window_time]) +- Exchange(distribution=[hash[a]]) - +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) + +- Calc(select=[a, window_start, window_end, window_time]) + +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) ]]> @@ -1511,11 +1518,12 @@ WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], si : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, window_start, window_end]) - +- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], select=[a, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[a ASC, window_start ASC, window_end ASC, window_time ASC], select=[a, window_start, window_end, window_time]) +- Exchange(distribution=[hash[a]]) - +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, slice_end('w$) AS $slice_end]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) + +- Calc(select=[a, window_start, window_end, window_time]) + +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.xml index 5db02638d2a7f7..3112f4d50db39a 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/validation/AggregateValidationTest.xml @@ -27,9 +27,10 @@ LogicalProject(_c0=[func(_UTF-16LE'abc')]) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala index 2bab70f2d6833a..f84e1d4bf1a847 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala @@ -296,5 +296,4 @@ class DeduplicateTest extends TableTestBase { util.verifyExecPlan(sqlQuery) } - } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala index 6ffe892ba851dd..302227be04dffa 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala @@ -1011,5 +1011,16 @@ class RankTest extends TableTestBase { util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE) } + @Test + def testOptimizeSelectDistinct(): Unit = { + val sql = + """ + |SELECT DISTINCT a, b, c + |FROM MyTable + """.stripMargin + + util.verifyExecPlan(sql) + } + // TODO add tests about multi-sinks and udf }