Skip to content

Commit

Permalink
[FLINK-19059]: Support non-time retract mode for OverAggregate operator
Browse files Browse the repository at this point in the history
- Non-time attribute over aggregates can support retract mode
- Time attribute over aggregates support only append mode
  • Loading branch information
bvarghese1 committed Dec 6, 2024
1 parent b25ef72 commit d0b856b
Show file tree
Hide file tree
Showing 16 changed files with 2,119 additions and 76 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
import org.apache.flink.table.catalog.{ManagedTableListener, ResolvedCatalogBaseTable}
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.`trait`._
import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
Expand Down Expand Up @@ -247,9 +248,33 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize")
createNewNode(cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)

case over: StreamPhysicalOverAggregate =>
// OverAggregate can only support insert for row-time/proc-time sort keys
var overRequiredTrait = ModifyKindSetTrait.INSERT_ONLY
val builder = ModifyKindSet
.newBuilder()
.addContainedKind(ModifyKind.INSERT)

// All aggregates are computed over the same window and order by is supported for only 1 field
val orderKeyIndex =
over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
if (
!FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
&& !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
) {
// Only non row-time/proc-time sort can support UPDATES
builder.addContainedKind(ModifyKind.UPDATE)
builder.addContainedKind(ModifyKind.DELETE)
overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
}
val children = visitChildren(over, overRequiredTrait)
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(over, children, providedTrait, requiredTrait, requester)

case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate =>
// TemporalSort, OverAggregate, IntervalJoin only support consuming insert-only
_: StreamPhysicalPythonOverAggregate =>
// TemporalSort, IntervalJoin only support consuming insert-only
// and producing insert-only changes
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
Expand Down Expand Up @@ -469,19 +494,18 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
_: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase |
_: StreamPhysicalWindowAggregate =>
// Aggregate, TableAggregate, Limit, GroupWindowAggregate, WindowAggregate,
_: StreamPhysicalWindowAggregate | _: StreamPhysicalOverAggregate =>
// Aggregate, TableAggregate, OverAggregate, Limit, GroupWindowAggregate, WindowAggregate,
// and WindowTableAggregate requires update_before if there are updates
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
val children = visitChildren(rel, requiredChildTrait)
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
createNewNode(rel, children, requiredTrait)

case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, OverAggregate,
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP,
// and IntervalJoin, WindowJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredTrait)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import static org.apache.flink.table.api.config.TableConfigOptions.LOCAL_TIME_ZONE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.flink.FlinkVersion;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.common.OverAggregateTestPrograms;
import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;

import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

Expand All @@ -36,7 +37,7 @@ public OverWindowRestoreTest() {
@Override
protected Stream<String> getSavepointPaths(
TableTestProgram program, ExecNodeMetadata metadata) {
if (metadata.version() == 1) {
if (metadata.version() == 1 && program.equals(OverWindowTestPrograms.LAG_OVER_FUNCTION)) {
return Stream.concat(
super.getSavepointPaths(program, metadata),
// See src/test/resources/restore-tests/stream-exec-over-aggregate_1/over
Expand All @@ -50,6 +51,11 @@ protected Stream<String> getSavepointPaths(

@Override
public List<TableTestProgram> programs() {
return Collections.singletonList(OverWindowTestPrograms.LAG_OVER_FUNCTION);
return Arrays.asList(
OverWindowTestPrograms.LAG_OVER_FUNCTION,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_RETRACT_MODE,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE_MULTIPLE_AGGS,
OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_NO_PARTITION_BY);
}
}
Loading

0 comments on commit d0b856b

Please sign in to comment.