Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19059]: Support non-time retract mode for OverAggregate operator #25753

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,9 @@ private StreamPhysicalRel visitOverAggregate(
}
return transmitDeterminismRequirement(overAgg, NO_REQUIRED_DETERMINISM);
} else {
// OverAgg does not support input with updates currently, so this branch will not be
// reached for now.

// We should append partition keys and order key to requireDeterminism
// OverAgg does not support input with updates when order by column is a time-attribute
// Only non-time order by attribute can support updates
// Append partition and order keys to requireDeterminism
return transmitDeterminismRequirement(
overAgg, mappingRequireDeterminismToInput(requireDeterminism, overAgg));
}
Expand Down Expand Up @@ -982,6 +981,10 @@ private ImmutableBitSet mappingRequireDeterminismToInput(
// add aggCall's input
int aggOutputIndex = inputFieldCnt;
for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) {
// Add sort fields
Arrays.stream(groupSpec.getSort().getFieldIndices())
.forEach(allRequiredInputSet::add);
// Add aggregation fields
for (AggregateCall aggCall : groupSpec.getAggCalls()) {
if (requireDeterminism.get(aggOutputIndex)) {
requiredSourceInput(aggCall, allRequiredInputSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
import org.apache.flink.table.catalog.{ManagedTableListener, ResolvedCatalogBaseTable}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
Expand Down Expand Up @@ -247,9 +248,33 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize")
createNewNode(cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)

case over: StreamPhysicalOverAggregate =>
// OverAggregate can only support insert for row-time/proc-time sort keys
var overRequiredTrait = ModifyKindSetTrait.INSERT_ONLY
val builder = ModifyKindSet
.newBuilder()
.addContainedKind(ModifyKind.INSERT)

// All aggregates are computed over the same window and order by is supported for only 1 field
val orderKeyIndex =
over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
if (
!FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
&& !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
) {
// Only non row-time/proc-time sort can support UPDATES
builder.addContainedKind(ModifyKind.UPDATE)
builder.addContainedKind(ModifyKind.DELETE)
overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
}
val children = visitChildren(over, overRequiredTrait)
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(over, children, providedTrait, requiredTrait, requester)

case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate =>
// TemporalSort, OverAggregate, IntervalJoin only support consuming insert-only
_: StreamPhysicalPythonOverAggregate =>
// TemporalSort, IntervalJoin only support consuming insert-only
// and producing insert-only changes
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
Expand Down Expand Up @@ -469,19 +494,18 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
_: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase |
_: StreamPhysicalWindowAggregate =>
// Aggregate, TableAggregate, Limit, GroupWindowAggregate, WindowAggregate,
_: StreamPhysicalWindowAggregate | _: StreamPhysicalOverAggregate =>
// Aggregate, TableAggregate, OverAggregate, Limit, GroupWindowAggregate, WindowAggregate,
// and WindowTableAggregate requires update_before if there are updates
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
val children = visitChildren(rel, requiredChildTrait)
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
createNewNode(rel, children, requiredTrait)

case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, OverAggregate,
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP,
// and IntervalJoin, WindowJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Test NDU errors for over aggregate. */
class OverAggregateNDUTest extends TableTestBase {

private StreamTableTestUtil util;
private TableEnvironment tEnv;

private static final String expectedNduErrorMsg =
"The column(s): $3(generated by non-deterministic function: UUID ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions.\n"
+ "\n"
+ "related rel plan:\n"
+ "Calc(select=[key, val, ts, UUID() AS $3], changelogMode=[I,UB,UA])\n"
+ "+- TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[key, val, ts], changelogMode=[I,UB,UA])\n";

@BeforeEach
void setup() {
util = streamTestUtil(TableConfig.getDefault());
tEnv = util.getTableEnv();
tEnv.getConfig()
.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
NonDeterministicUpdateStrategy.TRY_RESOLVE);

String sourceTable =
"CREATE TABLE source_t(\n"
+ " key STRING,\n"
+ " val BIGINT,\n"
+ " ts BIGINT\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false',\n"
+ " 'changelog-mode' = 'I,UB,UA')";
tEnv.executeSql(sourceTable);

String sinkTable =
"CREATE TABLE sink_t(\n"
+ " key STRING,\n"
+ " val BIGINT,\n"
+ " ts BIGINT,\n"
+ " sum_val BIGINT\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false',\n"
+ " 'sink-insert-only' = 'false',\n"
+ " 'changelog-mode' = 'I,UB,UA')";
tEnv.executeSql(sinkTable);
}

@Test
void testOverAggregateWithNonDeterminismInPartitionBy() {
String sql =
"INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER ("
+ "PARTITION BY UUID() "
+ "ORDER BY val "
+ "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) "
+ "AS sum_val "
+ "FROM source_t";

TableException tableException =
assertThrows(TableException.class, () -> util.verifyJsonPlan(sql));

assertEquals(expectedNduErrorMsg, tableException.getMessage());
}

@Test
void testOverAggregateWithNonDeterminismInOrderBy() {
String sql =
"INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER ("
+ "PARTITION BY key "
+ "ORDER BY UUID() "
+ "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) "
+ "AS sum_val "
+ "FROM source_t";

TableException tableException =
assertThrows(TableException.class, () -> util.verifyJsonPlan(sql));

assertEquals(expectedNduErrorMsg, tableException.getMessage());
}

@Test
void testOverAggregateWithNonDeterminismInProjection() {
String sql =
"INSERT INTO sink_t SELECT UUID(), val, ts, SUM(val) OVER ("
+ "PARTITION BY key "
+ "ORDER BY val "
+ "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) "
+ "AS sum_val "
+ "FROM source_t";

TableException tableException =
assertThrows(TableException.class, () -> util.verifyJsonPlan(sql));

assertEquals(expectedNduErrorMsg, tableException.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;

import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

Expand All @@ -36,7 +36,7 @@ public OverWindowRestoreTest() {
@Override
protected Stream<String> getSavepointPaths(
TableTestProgram program, ExecNodeMetadata metadata) {
if (metadata.version() == 1) {
if (metadata.version() == 1 && program.equals(OverWindowTestPrograms.LAG_OVER_FUNCTION)) {
return Stream.concat(
super.getSavepointPaths(program, metadata),
// See src/test/resources/restore-tests/stream-exec-over-aggregate_1/over
Expand All @@ -50,6 +50,11 @@ protected Stream<String> getSavepointPaths(

@Override
public List<TableTestProgram> programs() {
return Collections.singletonList(OverWindowTestPrograms.LAG_OVER_FUNCTION);
return Arrays.asList(
OverWindowTestPrograms.LAG_OVER_FUNCTION,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_RETRACT_MODE,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE_MULTIPLE_AGGS,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_NO_PARTITION_BY);
}
}
Loading