Skip to content

Commit

Permalink
[FLINK-19059] Update NDU checks for OverAggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
bvarghese1 committed Dec 15, 2024
1 parent 2865a3f commit 7484013
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,9 @@ private StreamPhysicalRel visitOverAggregate(
}
return transmitDeterminismRequirement(overAgg, NO_REQUIRED_DETERMINISM);
} else {
// OverAgg does not support input with updates currently, so this branch will not be
// reached for now.

// We should append partition keys and order key to requireDeterminism
// OverAgg does not support input with updates when order by column is a time-attribute
// Only non-time order by attribute can support updates
// Append partition and order keys to requireDeterminism
return transmitDeterminismRequirement(
overAgg, mappingRequireDeterminismToInput(requireDeterminism, overAgg));
}
Expand Down Expand Up @@ -982,6 +981,10 @@ private ImmutableBitSet mappingRequireDeterminismToInput(
// add aggCall's input
int aggOutputIndex = inputFieldCnt;
for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) {
// Add sort fields
Arrays.stream(groupSpec.getSort().getFieldIndices())
.forEach(allRequiredInputSet::add);
// Add aggregation fields
for (AggregateCall aggCall : groupSpec.getAggCalls()) {
if (requireDeterminism.get(aggOutputIndex)) {
requiredSourceInput(aggCall, allRequiredInputSet);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Test NDU errors for over aggregate. */
class OverAggregateNDUTest extends TableTestBase {

private StreamTableTestUtil util;
private TableEnvironment tEnv;

private static final String expectedNduErrorMsg =
"The column(s): $3(generated by non-deterministic function: UUID ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.\n"
+ "\n"
+ "related rel plan:\n"
+ "Calc(select=[key, val, ts, UUID() AS $3], changelogMode=[I,UB,UA])\n"
+ "+- TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[key, val, ts], changelogMode=[I,UB,UA])\n";

@BeforeEach
void setup() {
util = streamTestUtil(TableConfig.getDefault());
tEnv = util.getTableEnv();
tEnv.getConfig()
.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
NonDeterministicUpdateStrategy.TRY_RESOLVE);

String sourceTable =
"CREATE TABLE source_t(\n"
+ " key STRING,\n"
+ " val BIGINT,\n"
+ " ts BIGINT\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false',\n"
+ " 'changelog-mode' = 'I,UB,UA')";
tEnv.executeSql(sourceTable);

String sinkTable =
"CREATE TABLE sink_t(\n"
+ " key STRING,\n"
+ " val BIGINT,\n"
+ " ts BIGINT,\n"
+ " sum_val BIGINT\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false',\n"
+ " 'sink-insert-only' = 'false',\n"
+ " 'changelog-mode' = 'I,UB,UA')";
tEnv.executeSql(sinkTable);
}

@Test
void testOverAggregateWithNonDeterminismInPartitionBy() {
String sql =
"INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER ("
+ "PARTITION BY UUID() "
+ "ORDER BY val "
+ "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) "
+ "AS sum_val "
+ "FROM source_t";

TableException tableException =
assertThrows(TableException.class, () -> util.verifyJsonPlan(sql));

assertEquals(expectedNduErrorMsg, tableException.getMessage());
}

@Test
void testOverAggregateWithNonDeterminismInOrderBy() {
String sql =
"INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER ("
+ "PARTITION BY key "
+ "ORDER BY UUID() "
+ "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) "
+ "AS sum_val "
+ "FROM source_t";

TableException tableException =
assertThrows(TableException.class, () -> util.verifyJsonPlan(sql));

assertEquals(expectedNduErrorMsg, tableException.getMessage());
}

@Test
void testOverAggregateWithNonDeterminismInProjection() {
String sql =
"INSERT INTO sink_t SELECT UUID(), val, ts, SUM(val) OVER ("
+ "PARTITION BY key "
+ "ORDER BY val "
+ "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) "
+ "AS sum_val "
+ "FROM source_t";

TableException tableException =
assertThrows(TableException.class, () -> util.verifyJsonPlan(sql));

assertEquals(expectedNduErrorMsg, tableException.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
Expand Down

0 comments on commit 7484013

Please sign in to comment.