diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java index 98a1a6782cf1a0..15c6badf6634ab 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java @@ -629,10 +629,9 @@ private StreamPhysicalRel visitOverAggregate( } return transmitDeterminismRequirement(overAgg, NO_REQUIRED_DETERMINISM); } else { - // OverAgg does not support input with updates currently, so this branch will not be - // reached for now. - - // We should append partition keys and order key to requireDeterminism + // OverAgg does not support input with updates when order by column is a time-attribute + // Only non-time order by attribute can support updates + // Append partition and order keys to requireDeterminism return transmitDeterminismRequirement( overAgg, mappingRequireDeterminismToInput(requireDeterminism, overAgg)); } @@ -982,6 +981,10 @@ private ImmutableBitSet mappingRequireDeterminismToInput( // add aggCall's input int aggOutputIndex = inputFieldCnt; for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) { + // Add sort fields + Arrays.stream(groupSpec.getSort().getFieldIndices()) + .forEach(allRequiredInputSet::add); + // Add aggregation fields for (AggregateCall aggCall : groupSpec.getAggCalls()) { if (requireDeterminism.get(aggOutputIndex)) { requiredSourceInput(aggCall, allRequiredInputSet); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateNDUTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateNDUTest.java new file mode 100644 index 00000000000000..9f2565fdc5b19b --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateNDUTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Test NDU errors for over aggregate. */ +class OverAggregateNDUTest extends TableTestBase { + + private StreamTableTestUtil util; + private TableEnvironment tEnv; + + private static final String expectedNduErrorMsg = + "The column(s): $3(generated by non-deterministic function: UUID ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.\n" + + "\n" + + "related rel plan:\n" + + "Calc(select=[key, val, ts, UUID() AS $3], changelogMode=[I,UB,UA])\n" + + "+- TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[key, val, ts], changelogMode=[I,UB,UA])\n"; + + @BeforeEach + void setup() { + util = streamTestUtil(TableConfig.getDefault()); + tEnv = util.getTableEnv(); + tEnv.getConfig() + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, + NonDeterministicUpdateStrategy.TRY_RESOLVE); + + String sourceTable = + "CREATE TABLE source_t(\n" + + " key STRING,\n" + + " val BIGINT,\n" + + " ts BIGINT\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false',\n" + + " 'changelog-mode' = 'I,UB,UA')"; + tEnv.executeSql(sourceTable); + + String sinkTable = + "CREATE TABLE sink_t(\n" + + " key STRING,\n" + + " val BIGINT,\n" + + " ts BIGINT,\n" + + " sum_val BIGINT\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false',\n" + + " 'sink-insert-only' = 'false',\n" + + " 'changelog-mode' = 'I,UB,UA')"; + tEnv.executeSql(sinkTable); + } + + @Test + void testOverAggregateWithNonDeterminismInPartitionBy() { + String sql = + "INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER (" + + "PARTITION BY UUID() " + + "ORDER BY val " + + "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + "AS sum_val " + + "FROM source_t"; + + TableException tableException = + assertThrows(TableException.class, () -> util.verifyJsonPlan(sql)); + + assertEquals(expectedNduErrorMsg, tableException.getMessage()); + } + + @Test + void testOverAggregateWithNonDeterminismInOrderBy() { + String sql = + "INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER (" + + "PARTITION BY key " + + "ORDER BY UUID() " + + "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + "AS sum_val " + + "FROM source_t"; + + TableException tableException = + assertThrows(TableException.class, () -> util.verifyJsonPlan(sql)); + + assertEquals(expectedNduErrorMsg, tableException.getMessage()); + } + + @Test + void testOverAggregateWithNonDeterminismInProjection() { + String sql = + "INSERT INTO sink_t SELECT UUID(), val, ts, SUM(val) OVER (" + + "PARTITION BY key " + + "ORDER BY val " + + "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + "AS sum_val " + + "FROM source_t"; + + TableException tableException = + assertThrows(TableException.class, () -> util.verifyJsonPlan(sql)); + + assertEquals(expectedNduErrorMsg, tableException.getMessage()); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java index 0fb02fae339f95..1edd52a6752b0a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java @@ -18,6 +18,8 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy; import org.apache.flink.table.test.program.SinkTestStep; import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram;