diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java index 1ec65c852e370..0473e862bbfb1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java @@ -28,7 +28,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator; import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator; +import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator; import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; @@ -44,10 +46,14 @@ import org.apache.flink.table.planner.plan.utils.AggregateUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; import org.apache.flink.table.planner.plan.utils.OverAggregateUtil; +import org.apache.flink.table.planner.plan.utils.SortUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.over.NonTimeUnboundedPrecedingFunction; import org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction; import org.apache.flink.table.runtime.operators.over.ProcTimeRowsBoundedPrecedingFunction; import org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction; @@ -55,6 +61,7 @@ import org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction; import org.apache.flink.table.runtime.operators.over.RowTimeRowsBoundedPrecedingFunction; import org.apache.flink.table.runtime.operators.over.RowTimeRowsUnboundedPrecedingFunction; +import org.apache.flink.table.runtime.operators.over.TimeAttribute; import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.StateConfigUtil; @@ -165,15 +172,15 @@ protected Transformation translateToPlanInternal( final int orderKey = orderKeys[0]; final LogicalType orderKeyType = inputRowType.getFields().get(orderKey).getType(); - // check time field && identify window rowtime attribute - final int rowTimeIdx; + // check time field && identify window time attribute + TimeAttribute timeAttribute; if (isRowtimeAttribute(orderKeyType)) { - rowTimeIdx = orderKey; + timeAttribute = TimeAttribute.ROW_TIME; } else if (isProctimeAttribute(orderKeyType)) { - rowTimeIdx = -1; + timeAttribute = TimeAttribute.PROC_TIME; } else { - throw new TableException( - "OVER windows' ordering in stream mode must be defined on a time attribute."); + timeAttribute = TimeAttribute.NON_TIME; + LOG.info("Non-time attribute window detected"); } final List constants = overSpec.getConstants(); @@ -203,8 +210,9 @@ protected Transformation translateToPlanInternal( constants, aggInputRowType, inputRowType, - rowTimeIdx, group.isRows(), + orderKeys, + timeAttribute, config, planner.createRelBuilder(), planner.getTypeFactory()); @@ -227,8 +235,9 @@ protected Transformation translateToPlanInternal( constants, aggInputRowType, inputRowType, - rowTimeIdx, group.isRows(), + orderKeys, + timeAttribute, precedingOffset, config, planner.createRelBuilder(), @@ -269,8 +278,9 @@ protected Transformation translateToPlanInternal( * @param constants the constants in aggregates parameters, such as sum(1) * @param aggInputRowType physical type of the input row which consists of input and constants. * @param inputRowType physical type of the input row which only consists of input. - * @param rowTimeIdx the index of the rowtime field or None in case of processing time. * @param isRowsClause it is a tag that indicates whether the OVER clause is ROWS clause + * @param orderKeys the order by key to sort on + * @param timeAttribute indicates the type of time attribute the OVER clause is based on */ private KeyedProcessFunction createUnboundedOverProcessFunction( CodeGeneratorContext ctx, @@ -278,11 +288,17 @@ private KeyedProcessFunction createUnboundedOverProce List constants, RowType aggInputRowType, RowType inputRowType, - int rowTimeIdx, boolean isRowsClause, + int[] orderKeys, + TimeAttribute timeAttribute, ExecNodeConfig config, RelBuilder relBuilder, FlinkTypeFactory typeFactory) { + boolean[] needRetractions = new boolean[aggCalls.size()]; + if (timeAttribute.equals(TimeAttribute.NON_TIME)) { + Arrays.fill(needRetractions, true); + } + AggregateInfoList aggInfoList = AggregateUtil.transformToStreamAggregateInfoList( typeFactory, @@ -290,56 +306,88 @@ private KeyedProcessFunction createUnboundedOverProce // inputSchema.relDataType aggInputRowType, JavaScalaConversionUtil.toScala(aggCalls), - new boolean[aggCalls.size()], - false, // needRetraction + needRetractions, + false, // needInputCount true, // isStateBackendDataViews true); // needDistinctInfo LogicalType[] fieldTypes = inputRowType.getChildren().toArray(new LogicalType[0]); - AggsHandlerCodeGenerator generator = + + AggsHandlerCodeGenerator aggsGenerator = new AggsHandlerCodeGenerator( ctx, relBuilder, JavaScalaConversionUtil.toScala(Arrays.asList(fieldTypes)), false); // copyInputField - GeneratedAggsHandleFunction genAggsHandler = - generator + aggsGenerator = + aggsGenerator .needAccumulate() // over agg code gen must pass the constants - .withConstants(JavaScalaConversionUtil.toScala(constants)) - .generateAggsHandler("UnboundedOverAggregateHelper", aggInfoList); + .withConstants(JavaScalaConversionUtil.toScala(constants)); + + if (timeAttribute.equals(TimeAttribute.NON_TIME)) { + aggsGenerator.needRetract(); + } + + GeneratedAggsHandleFunction genAggsHandler = + aggsGenerator.generateAggsHandler("UnboundedOverAggregateHelper", aggInfoList); LogicalType[] flattenAccTypes = Arrays.stream(aggInfoList.getAccTypes()) .map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType) .toArray(LogicalType[]::new); - if (rowTimeIdx >= 0) { - if (isRowsClause) { - // ROWS unbounded over process function - return new RowTimeRowsUnboundedPrecedingFunction<>( - config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), + switch (timeAttribute) { + case ROW_TIME: + final int rowTimeIdx = orderKeys[0]; + if (isRowsClause) { + // ROWS unbounded over process function + return new RowTimeRowsUnboundedPrecedingFunction<>( + config.getStateRetentionTime(), + TableConfigUtils.getMaxIdleStateRetentionTime(config), + genAggsHandler, + flattenAccTypes, + fieldTypes, + rowTimeIdx); + } else { + // RANGE unbounded over process function + return new RowTimeRangeUnboundedPrecedingFunction<>( + config.getStateRetentionTime(), + TableConfigUtils.getMaxIdleStateRetentionTime(config), + genAggsHandler, + flattenAccTypes, + fieldTypes, + rowTimeIdx); + } + case PROC_TIME: + return new ProcTimeUnboundedPrecedingFunction<>( + StateConfigUtil.createTtlConfig(config.getStateRetentionTime()), genAggsHandler, - flattenAccTypes, - fieldTypes, - rowTimeIdx); - } else { - // RANGE unbounded over process function - return new RowTimeRangeUnboundedPrecedingFunction<>( + flattenAccTypes); + case NON_TIME: + final GeneratedRecordEqualiser generatedEqualiser = + new EqualiserCodeGenerator(inputRowType, ctx.classLoader()) + .generateRecordEqualiser("FirstMatchingRowEqualiser"); + + final GeneratedRecordComparator genRecordComparator = + ComparatorCodeGenerator.gen( + config, + ctx.classLoader(), + "SortComparator", + inputRowType, + SortUtil.getAscendingSortSpec(orderKeys)); + + return new NonTimeUnboundedPrecedingFunction<>( config.getStateRetentionTime(), TableConfigUtils.getMaxIdleStateRetentionTime(config), genAggsHandler, + generatedEqualiser, + genRecordComparator, flattenAccTypes, - fieldTypes, - rowTimeIdx); - } - } else { - return new ProcTimeUnboundedPrecedingFunction<>( - StateConfigUtil.createTtlConfig(config.getStateRetentionTime()), - genAggsHandler, - flattenAccTypes); + fieldTypes); + default: + throw new TableException("Unsupported unbounded operation"); } } @@ -352,8 +400,9 @@ private KeyedProcessFunction createUnboundedOverProce * @param constants the constants in aggregates parameters, such as sum(1) * @param aggInputType physical type of the input row which consists of input and constants. * @param inputType physical type of the input row which only consists of input. - * @param rowTimeIdx the index of the rowtime field or None in case of processing time. * @param isRowsClause it is a tag that indicates whether the OVER clause is ROWS clause + * @param orderKeys the order by key to sort on + * @param timeAttribute indicates the type of time attribute the OVER clause is based on */ private KeyedProcessFunction createBoundedOverProcessFunction( CodeGeneratorContext ctx, @@ -361,8 +410,9 @@ private KeyedProcessFunction createBoundedOverProcess List constants, RowType aggInputType, RowType inputType, - int rowTimeIdx, boolean isRowsClause, + int[] orderKeys, + TimeAttribute timeAttribute, long precedingOffset, ExecNodeConfig config, RelBuilder relBuilder, @@ -403,33 +453,44 @@ private KeyedProcessFunction createBoundedOverProcess .map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType) .toArray(LogicalType[]::new); - if (rowTimeIdx >= 0) { - if (isRowsClause) { - return new RowTimeRowsBoundedPrecedingFunction<>( - config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), - genAggsHandler, - flattenAccTypes, - fieldTypes, - precedingOffset, - rowTimeIdx); - } else { - return new RowTimeRangeBoundedPrecedingFunction<>( - genAggsHandler, flattenAccTypes, fieldTypes, precedingOffset, rowTimeIdx); - } - } else { - if (isRowsClause) { - return new ProcTimeRowsBoundedPrecedingFunction<>( - config.getStateRetentionTime(), - TableConfigUtils.getMaxIdleStateRetentionTime(config), - genAggsHandler, - flattenAccTypes, - fieldTypes, - precedingOffset); - } else { - return new ProcTimeRangeBoundedPrecedingFunction<>( - genAggsHandler, flattenAccTypes, fieldTypes, precedingOffset); - } + switch (timeAttribute) { + case ROW_TIME: + final int rowTimeIdx = orderKeys[0]; + if (isRowsClause) { + return new RowTimeRowsBoundedPrecedingFunction<>( + config.getStateRetentionTime(), + TableConfigUtils.getMaxIdleStateRetentionTime(config), + genAggsHandler, + flattenAccTypes, + fieldTypes, + precedingOffset, + rowTimeIdx); + } else { + return new RowTimeRangeBoundedPrecedingFunction<>( + genAggsHandler, + flattenAccTypes, + fieldTypes, + precedingOffset, + rowTimeIdx); + } + case PROC_TIME: + if (isRowsClause) { + return new ProcTimeRowsBoundedPrecedingFunction<>( + config.getStateRetentionTime(), + TableConfigUtils.getMaxIdleStateRetentionTime(config), + genAggsHandler, + flattenAccTypes, + fieldTypes, + precedingOffset); + } else { + return new ProcTimeRangeBoundedPrecedingFunction<>( + genAggsHandler, flattenAccTypes, fieldTypes, precedingOffset); + } + case NON_TIME: + throw new TableException( + "Non-time attribute sort is not supported for bounded over aggregate"); + default: + throw new TableException("Unsupported bounded operation"); } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 6559ed1bbd331..c502524195405 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -23,6 +23,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize import org.apache.flink.table.catalog.{ManagedTableListener, ResolvedCatalogBaseTable} import org.apache.flink.table.connector.ChangelogMode +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.`trait`._ import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER} import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery @@ -247,9 +248,33 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize") createNewNode(cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester) + case over: StreamPhysicalOverAggregate => + // OverAggregate can only support insert for row-time/proc-time sort keys + var overRequiredTrait = ModifyKindSetTrait.INSERT_ONLY + val builder = ModifyKindSet + .newBuilder() + .addContainedKind(ModifyKind.INSERT) + + // All aggregates are computed over the same window and order by is supported for only 1 field + val orderKeyIndex = + over.logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex + val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType + if ( + !FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType) + && !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType) + ) { + // Only non row-time/proc-time sort can support UPDATES + builder.addContainedKind(ModifyKind.UPDATE) + builder.addContainedKind(ModifyKind.DELETE) + overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES + } + val children = visitChildren(over, overRequiredTrait) + val providedTrait = new ModifyKindSetTrait(builder.build()) + createNewNode(over, children, providedTrait, requiredTrait, requester) + case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin | - _: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate => - // TemporalSort, OverAggregate, IntervalJoin only support consuming insert-only + _: StreamPhysicalPythonOverAggregate => + // TemporalSort, IntervalJoin only support consuming insert-only // and producing insert-only changes val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY) createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester) @@ -469,8 +494,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate | _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate | _: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase | - _: StreamPhysicalWindowAggregate => - // Aggregate, TableAggregate, Limit, GroupWindowAggregate, WindowAggregate, + _: StreamPhysicalWindowAggregate | _: StreamPhysicalOverAggregate => + // Aggregate, TableAggregate, OverAggregate, Limit, GroupWindowAggregate, WindowAggregate, // and WindowTableAggregate requires update_before if there are updates val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0))) val children = visitChildren(rel, requiredChildTrait) @@ -478,10 +503,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti createNewNode(rel, children, requiredTrait) case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate | - _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | - _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin | + _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | _: StreamPhysicalIntervalJoin | _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin => - // WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, OverAggregate, + // WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, // and IntervalJoin, WindowJoin require nothing about UpdateKind. val children = visitChildren(rel, UpdateKindTrait.NONE) createNewNode(rel, children, requiredTrait) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/OverAggregateTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/OverAggregateTestPrograms.java index 748dd9c5b1d55..757b1f2fc8f53 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/OverAggregateTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/OverAggregateTestPrograms.java @@ -24,6 +24,7 @@ import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import static org.apache.flink.table.api.config.TableConfigOptions.LOCAL_TIME_ZONE; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java index cb681e4bff514..4ef3012f61731 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java @@ -20,10 +20,11 @@ import org.apache.flink.FlinkVersion; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.nodes.exec.common.OverAggregateTestPrograms; import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; import org.apache.flink.table.test.program.TableTestProgram; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.stream.Stream; @@ -36,7 +37,7 @@ public OverWindowRestoreTest() { @Override protected Stream getSavepointPaths( TableTestProgram program, ExecNodeMetadata metadata) { - if (metadata.version() == 1) { + if (metadata.version() == 1 && program.equals(OverWindowTestPrograms.LAG_OVER_FUNCTION)) { return Stream.concat( super.getSavepointPaths(program, metadata), // See src/test/resources/restore-tests/stream-exec-over-aggregate_1/over @@ -50,6 +51,11 @@ protected Stream getSavepointPaths( @Override public List programs() { - return Collections.singletonList(OverWindowTestPrograms.LAG_OVER_FUNCTION); + return Arrays.asList( + OverWindowTestPrograms.LAG_OVER_FUNCTION, + OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_RETRACT_MODE, + OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE, + OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE_MULTIPLE_AGGS, + OverWindowTestPrograms.OVER_AGGREGATE_NON_TIME_UNBOUNDED_NO_PARTITION_BY); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java index e9fdbb03a427c..0fb02fae339f9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowTestPrograms.java @@ -22,6 +22,7 @@ import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import java.util.Collections; @@ -58,4 +59,275 @@ public class OverWindowTestPrograms { "INSERT INTO sink_t SELECT ts, LAG(b, 1) over (order by r_time) AS " + "bLag FROM t") .build(); + + static final TableTestProgram OVER_AGGREGATE_NON_TIME_UNBOUNDED_RETRACT_MODE = + TableTestProgram.of( + "over-aggregate-sum-retract-mode", + "validates restoring an unbounded preceding sum function in retract mode") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("key STRING", "val BIGINT", "ts BIGINT") + .addOption("changelog-mode", "I,UB,UA") + .producedBeforeRestore( + Row.of("key1", 1L, 100L), + Row.of("key1", 2L, 200L), + Row.of("key1", 5L, 500L), + Row.of("key1", 6L, 600L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 2L, 200L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 3L, 200L), + Row.of("key2", 1L, 100L), + Row.of("key2", 2L, 200L)) + .producedAfterRestore( + Row.of("key3", 1L, 100L), + Row.of("key1", 4L, 400L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 3L, 200L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 3L, 300L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "key STRING", + "val BIGINT", + "ts BIGINT", + "sum_val BIGINT") + .consumedBeforeRestore( + Row.of("key1", 1L, 100L, 1L), + Row.of("key1", 2L, 200L, 3L), + Row.of("key1", 5L, 500L, 8L), + Row.of("key1", 6L, 600L, 14L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 8L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 6L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 14L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 12L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 3L, 200L, 4L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 6L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 9L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 12L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 15L), + Row.of("key2", 1L, 100L, 1L), + Row.of("key2", 2L, 200L, 3L)) + .consumedAfterRestore( + Row.of("key3", 1L, 100L, 1L), + Row.of("key1", 4L, 400L, 8L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 9L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 13L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 15L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 19L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 3L, 200L, 4L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 4L, 400L, 8L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 4L, 400L, 5L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 13L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 10L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 19L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 16L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 3L, 300L, 4L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 4L, 400L, 5L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 4L, 400L, 8L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 10L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 13L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 16L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 19L)) + .build()) + .runSql( + "INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER (" + + "PARTITION BY key " + + "ORDER BY val " + + "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + "AS sum_val " + + "FROM source_t") + .build(); + + static final TableTestProgram OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE = + TableTestProgram.of( + "over-aggregate-sum-append-mode", + "validates restoring an unbounded preceding sum function in append mode") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("key STRING", "val BIGINT", "ts BIGINT") + .addOption("changelog-mode", "I") + .producedBeforeRestore( + Row.of("key1", 1L, 100L), + Row.of("key1", 2L, 200L), + Row.of("key1", 5L, 500L), + Row.of("key1", 6L, 600L), + Row.of("key2", 1L, 100L), + Row.of("key2", 2L, 200L)) + .producedAfterRestore(Row.of("key1", 4L, 400L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "key STRING", + "val BIGINT", + "ts BIGINT", + "sum_val BIGINT") + .consumedBeforeRestore( + Row.of("key1", 1L, 100L, 1L), + Row.of("key1", 2L, 200L, 3L), + Row.of("key1", 5L, 500L, 8L), + Row.of("key1", 6L, 600L, 14L), + Row.of("key2", 1L, 100L, 1L), + Row.of("key2", 2L, 200L, 3L)) + .consumedAfterRestore( + Row.of("key1", 4L, 400L, 7L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 8L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 12L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 14L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 18L)) + .build()) + .runSql( + "INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER (" + + "PARTITION BY key " + + "ORDER BY val " + + "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + "AS sum_val " + + "FROM source_t") + .build(); + + static final TableTestProgram OVER_AGGREGATE_NON_TIME_UNBOUNDED_APPEND_MODE_MULTIPLE_AGGS = + TableTestProgram.of( + "over-aggregate-sum-append-mode-multiple-aggs", + "validates restoring an unbounded preceding sum function in append mode") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("key STRING", "val BIGINT", "ts BIGINT") + .addOption("changelog-mode", "I") + .producedBeforeRestore( + Row.of("key1", 1L, 100L), + Row.of("key1", 2L, 200L), + Row.of("key1", 5L, 500L), + Row.of("key1", 6L, 600L), + Row.of("key2", 1L, 100L), + Row.of("key2", 2L, 200L)) + .producedAfterRestore(Row.of("key1", 4L, 400L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "key STRING", + "val BIGINT", + "ts BIGINT", + "sum_val BIGINT", + "cnt_key BIGINT") + .consumedBeforeRestore( + Row.of("key1", 1L, 100L, 1L, 1L), + Row.of("key1", 2L, 200L, 3L, 2L), + Row.of("key1", 5L, 500L, 8L, 3L), + Row.of("key1", 6L, 600L, 14L, 4L), + Row.of("key2", 1L, 100L, 1L, 1L), + Row.of("key2", 2L, 200L, 3L, 2L)) + .consumedAfterRestore( + Row.of("key1", 4L, 400L, 7L, 3L), + Row.ofKind( + RowKind.UPDATE_BEFORE, + "key1", + 5L, + 500L, + 8L, + 3L), + Row.ofKind( + RowKind.UPDATE_AFTER, + "key1", + 5L, + 500L, + 12L, + 4L), + Row.ofKind( + RowKind.UPDATE_BEFORE, + "key1", + 6L, + 600L, + 14L, + 4L), + Row.ofKind( + RowKind.UPDATE_AFTER, + "key1", + 6L, + 600L, + 18L, + 5L)) + .build()) + .runSql( + "INSERT INTO sink_t SELECT key, val, ts, " + + "SUM(val) OVER (" + + "PARTITION BY key " + + "ORDER BY val " + + "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + "AS sum_val, " + + "COUNT(key) OVER (" + + "PARTITION BY key " + + "ORDER BY val " + + "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + "AS cnt_key " + + "FROM source_t") + .build(); + + static final TableTestProgram OVER_AGGREGATE_NON_TIME_UNBOUNDED_NO_PARTITION_BY = + TableTestProgram.of( + "over-aggregate-sum-no-partition-by", + "validates restoring an unbounded preceding sum function without partition by") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("key STRING", "val BIGINT", "ts BIGINT") + .addOption("changelog-mode", "I") + .producedBeforeRestore( + Row.of("key1", 1L, 100L), + Row.of("key1", 2L, 200L), + Row.of("key1", 5L, 500L), + Row.of("key1", 6L, 600L), + Row.of("key2", 1L, 100L), + Row.of("key2", 2L, 200L)) + .producedAfterRestore(Row.of("key1", 4L, 400L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "key STRING", + "val BIGINT", + "ts BIGINT", + "sum_val BIGINT") + .consumedBeforeRestore( + Row.of("key1", 1L, 100L, 1L), + Row.of("key1", 2L, 200L, 3L), + Row.of("key1", 5L, 500L, 8L), + Row.of("key1", 6L, 600L, 14L), + Row.of("key2", 1L, 100L, 2L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 2L, 200L, 3L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 2L, 200L, 4L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 8L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 9L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 14L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 15L), + Row.of("key2", 2L, 200L, 6L), + Row.ofKind(RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 9L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 11L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 15L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 17L)) + .consumedAfterRestore( + Row.of("key1", 4L, 400L, 10L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 5L, 500L, 11L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 5L, 500L, 15L), + Row.ofKind( + RowKind.UPDATE_BEFORE, "key1", 6L, 600L, 17L), + Row.ofKind(RowKind.UPDATE_AFTER, "key1", 6L, 600L, 21L)) + .build()) + .runSql( + "INSERT INTO sink_t SELECT key, val, ts, SUM(val) OVER (" + + "ORDER BY val " + + "RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " + + "AS sum_val " + + "FROM source_t") + .build(); } diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode-multiple-aggs/plan/over-aggregate-sum-append-mode-multiple-aggs.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode-multiple-aggs/plan/over-aggregate-sum-append-mode-multiple-aggs.json new file mode 100644 index 0000000000000..f4c8df6a71a13 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode-multiple-aggs/plan/over-aggregate-sum-append-mode-multiple-aggs.json @@ -0,0 +1,243 @@ +{ + "flinkVersion" : "2.0", + "nodes" : [ { + "id" : 11, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "key", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "val", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[key, val, ts])", + "inputProperties" : [ ] + }, { + "id" : 12, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT>", + "description" : "Exchange(distribution=[hash[key]])" + }, { + "id" : 13, + "type" : "stream-exec-over-aggregate_1", + "overSpec" : { + "partition" : { + "fields" : [ 0 ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : false, + "lowerBound" : { + "kind" : "UNBOUNDED_PRECEDING" + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "w0$o1", + "internalName" : "$$SUM0$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "w0$o2", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 0 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ] + } ], + "constants" : [ ], + "originalInputFields" : 3 + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL, `w0$o2` BIGINT NOT NULL>", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1, COUNT(key) AS w0$o2])" + }, { + "id" : 14, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CASE$1", + "operands" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : 0, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BOOLEAN NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "BIGINT" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `sum_val` BIGINT, `cnt_key` BIGINT NOT NULL>", + "description" : "Calc(select=[key, val, ts, CASE((w0$o0 > 0), w0$o1, null:BIGINT) AS sum_val, w0$o2 AS cnt_key])" + }, { + "id" : 15, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "key", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "val", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "BIGINT" + }, { + "name" : "sum_val", + "dataType" : "BIGINT" + }, { + "name" : "cnt_key", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `sum_val` BIGINT, `cnt_key` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[key, val, ts, sum_val, cnt_key])" + } ], + "edges" : [ { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode-multiple-aggs/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode-multiple-aggs/savepoint/_metadata new file mode 100644 index 0000000000000..71041e24cf783 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode-multiple-aggs/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode/plan/over-aggregate-sum-append-mode.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode/plan/over-aggregate-sum-append-mode.json new file mode 100644 index 0000000000000..b8f297962e64e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode/plan/over-aggregate-sum-append-mode.json @@ -0,0 +1,226 @@ +{ + "flinkVersion" : "2.0", + "nodes" : [ { + "id" : 6, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "key", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "val", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[key, val, ts])", + "inputProperties" : [ ] + }, { + "id" : 7, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT>", + "description" : "Exchange(distribution=[hash[key]])" + }, { + "id" : 8, + "type" : "stream-exec-over-aggregate_1", + "overSpec" : { + "partition" : { + "fields" : [ 0 ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : false, + "lowerBound" : { + "kind" : "UNBOUNDED_PRECEDING" + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "w0$o1", + "internalName" : "$$SUM0$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ] + } ], + "constants" : [ ], + "originalInputFields" : 3 + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])" + }, { + "id" : 9, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CASE$1", + "operands" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : 0, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BOOLEAN NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "BIGINT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `sum_val` BIGINT>", + "description" : "Calc(select=[key, val, ts, CASE((w0$o0 > 0), w0$o1, null:BIGINT) AS sum_val])" + }, { + "id" : 10, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "key", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "val", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "BIGINT" + }, { + "name" : "sum_val", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `sum_val` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[key, val, ts, sum_val])" + } ], + "edges" : [ { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode/savepoint/_metadata new file mode 100644 index 0000000000000..6450dc145543a Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-append-mode/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-no-partition-by/plan/over-aggregate-sum-no-partition-by.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-no-partition-by/plan/over-aggregate-sum-no-partition-by.json new file mode 100644 index 0000000000000..472b75092fe2d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-no-partition-by/plan/over-aggregate-sum-no-partition-by.json @@ -0,0 +1,225 @@ +{ + "flinkVersion" : "2.0", + "nodes" : [ { + "id" : 16, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "key", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "val", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[key, val, ts])", + "inputProperties" : [ ] + }, { + "id" : 17, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "SINGLETON" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT>", + "description" : "Exchange(distribution=[single])" + }, { + "id" : 18, + "type" : "stream-exec-over-aggregate_1", + "overSpec" : { + "partition" : { + "fields" : [ ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : false, + "lowerBound" : { + "kind" : "UNBOUNDED_PRECEDING" + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "w0$o1", + "internalName" : "$$SUM0$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ] + } ], + "constants" : [ ], + "originalInputFields" : 3 + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", + "description" : "OverAggregate(orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])" + }, { + "id" : 19, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CASE$1", + "operands" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : 0, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BOOLEAN NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "BIGINT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `sum_val` BIGINT>", + "description" : "Calc(select=[key, val, ts, CASE((w0$o0 > 0), w0$o1, null:BIGINT) AS sum_val])" + }, { + "id" : 20, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "key", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "val", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "BIGINT" + }, { + "name" : "sum_val", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `sum_val` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[key, val, ts, sum_val])" + } ], + "edges" : [ { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-no-partition-by/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-no-partition-by/savepoint/_metadata new file mode 100644 index 0000000000000..324d333fd06a0 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-no-partition-by/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-retract-mode/plan/over-aggregate-sum-retract-mode.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-retract-mode/plan/over-aggregate-sum-retract-mode.json new file mode 100644 index 0000000000000..f274948cd7b58 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-retract-mode/plan/over-aggregate-sum-retract-mode.json @@ -0,0 +1,226 @@ +{ + "flinkVersion" : "2.0", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "key", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "val", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[key, val, ts])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT>", + "description" : "Exchange(distribution=[hash[key]])" + }, { + "id" : 3, + "type" : "stream-exec-over-aggregate_1", + "overSpec" : { + "partition" : { + "fields" : [ 0 ] + }, + "groups" : [ { + "orderBy" : { + "fields" : [ { + "index" : 1, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "isRows" : false, + "lowerBound" : { + "kind" : "UNBOUNDED_PRECEDING" + }, + "upperBound" : { + "kind" : "CURRENT_ROW" + }, + "aggCalls" : [ { + "name" : "w0$o0", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "w0$o1", + "internalName" : "$$SUM0$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ] + } ], + "constants" : [ ], + "originalInputFields" : 3 + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])" + }, { + "id" : 4, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CASE$1", + "operands" : [ { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : 0, + "type" : "BIGINT NOT NULL" + } ], + "type" : "BOOLEAN NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "BIGINT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `sum_val` BIGINT>", + "description" : "Calc(select=[key, val, ts, CASE((w0$o0 > 0), w0$o1, null:BIGINT) AS sum_val])" + }, { + "id" : 5, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "key", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "val", + "dataType" : "BIGINT" + }, { + "name" : "ts", + "dataType" : "BIGINT" + }, { + "name" : "sum_val", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `sum_val` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[key, val, ts, sum_val])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-retract-mode/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-retract-mode/savepoint/_metadata new file mode 100644 index 0000000000000..c15ab4d9a44ac Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-sum-retract-mode/savepoint/_metadata differ diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeUnboundedPrecedingFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeUnboundedPrecedingFunction.java new file mode 100644 index 0000000000000..c362d38a681f8 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeUnboundedPrecedingFunction.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.over; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.runtime.generated.AggsHandleFunction; +import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.ListIterator; + +/** A basic implementation to support non-time unbounded over aggregate with retract mode. */ +public class NonTimeUnboundedPrecedingFunction + extends KeyedProcessFunctionWithCleanupState { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(NonTimeUnboundedPrecedingFunction.class); + + private final GeneratedAggsHandleFunction genAggsHandler; + private final GeneratedRecordEqualiser generatedRecordEqualiser; + private final GeneratedRecordComparator generatedRecordComparator; + + // The util to compare two rows based on the sort attribute. + private transient Comparator sortKeyComparator; + // The record equaliser used to equal RowData. + private transient RecordEqualiser equaliser; + + private final LogicalType[] accTypes; + private final LogicalType[] inputFieldTypes; + protected transient JoinedRowData output; + + // state to hold the accumulators of the aggregations + private transient ValueState accState; + // state to hold rows until state ttl expires + private transient ValueState> inputState; + + protected transient AggsHandleFunction currFunction; + protected transient AggsHandleFunction prevFunction; + + public NonTimeUnboundedPrecedingFunction( + long minRetentionTime, + long maxRetentionTime, + GeneratedAggsHandleFunction genAggsHandler, + GeneratedRecordEqualiser genRecordEqualiser, + GeneratedRecordComparator genRecordComparator, + LogicalType[] accTypes, + LogicalType[] inputFieldTypes) { + super(minRetentionTime, maxRetentionTime); + this.genAggsHandler = genAggsHandler; + this.generatedRecordEqualiser = genRecordEqualiser; + this.generatedRecordComparator = genRecordComparator; + this.accTypes = accTypes; + this.inputFieldTypes = inputFieldTypes; + } + + @Override + public void open(OpenContext openContext) throws Exception { + // Initialize agg functions + currFunction = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader()); + currFunction.open(new PerKeyStateDataViewStore(getRuntimeContext())); + + prevFunction = genAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader()); + prevFunction.open(new PerKeyStateDataViewStore(getRuntimeContext())); + + // Initialize output record + output = new JoinedRowData(); + + // Intialize record equaliser + equaliser = + generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader()); + + // Initialize sort comparator + sortKeyComparator = + generatedRecordComparator.newInstance(getRuntimeContext().getUserCodeClassLoader()); + + // Initialize accumulator state + InternalTypeInfo accTypeInfo = InternalTypeInfo.ofFields(accTypes); + ValueStateDescriptor accStateDesc = + new ValueStateDescriptor("accState", accTypeInfo); + accState = getRuntimeContext().getState(accStateDesc); + + // Input elements are all binary rows as they came from network + InternalTypeInfo inputType = InternalTypeInfo.ofFields(inputFieldTypes); + ListTypeInfo rowListTypeInfo = new ListTypeInfo(inputType); + + ValueStateDescriptor> inputStateDescriptor = + new ValueStateDescriptor>("inputState", rowListTypeInfo); + + // Initialize state which maintains records in sorted(ASC) order + inputState = getRuntimeContext().getState(inputStateDescriptor); + + initCleanupTimeState("NonTimeUnboundedPrecedingFunctionCleanupTime"); + } + + /** + * Puts an element from the input stream into state if it is not late. Registers a timer for the + * next watermark. + * + * @param input The input value. + * @param ctx A {@link Context} that allows querying the timestamp of the element and getting + * TimerService for registering timers and querying the time. The context is only valid + * during the invocation of this method, do not store it. + * @param out The collector for returning result values. + * @throws Exception + */ + @Override + public void processElement( + RowData input, + KeyedProcessFunction.Context ctx, + Collector out) + throws Exception { + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()); + + // get last accumulator + RowData lastAccumulatorCurr = accState.value(); + if (lastAccumulatorCurr == null) { + // initialize accumulator + lastAccumulatorCurr = currFunction.createAccumulators(); + } + // set accumulator in function context first + currFunction.setAccumulators(lastAccumulatorCurr); + + // get last accumulator + RowData lastAccumulatorPrev = accState.value(); + if (lastAccumulatorPrev == null) { + // initialize accumulator + lastAccumulatorPrev = prevFunction.createAccumulators(); + } + // set accumulator in function context first + prevFunction.setAccumulators(lastAccumulatorPrev); + + RowKind rowKind = input.getRowKind(); + + if (rowKind == RowKind.INSERT || rowKind == RowKind.UPDATE_AFTER) { + insertIntoSortedList(input, out); + } else if (rowKind == RowKind.DELETE || rowKind == RowKind.UPDATE_BEFORE) { + removeFromSortedList(input, out); + } + + // Reset acc state since we can have out of order inserts into the ordered list + currFunction.resetAccumulators(); + prevFunction.resetAccumulators(); + currFunction.cleanup(); + prevFunction.cleanup(); + } + + private void insertIntoSortedList(RowData rowData, Collector out) throws Exception { + List rowList = inputState.value(); + if (rowList == null) { + rowList = new ArrayList<>(); + } + boolean isInserted = false; + RowKind origRowKind = rowData.getRowKind(); + rowData.setRowKind(RowKind.INSERT); + ListIterator iterator = rowList.listIterator(); + + while (iterator.hasNext()) { + RowData curRow = iterator.next(); + if (sortKeyComparator.compare(curRow, rowData) > 0) { + iterator.previous(); + iterator.add(rowData); + isInserted = true; + break; + } + currFunction.accumulate(curRow); + prevFunction.accumulate(curRow); + } + + // Add to the end of the list + if (!isInserted) { + iterator.add(rowData); + } + + // Only accumulate rowData with currFunction + currFunction.accumulate(rowData); + + // Update state with the newly inserted row + inputState.update(rowList); + + // prepare output row + output.setRowKind(origRowKind); + output.replace(rowData, currFunction.getValue()); + out.collect(output); + + // Emit updated agg value for all records after newly inserted row + while (iterator.hasNext()) { + RowData curRow = iterator.next(); + currFunction.accumulate(curRow); + prevFunction.accumulate(curRow); + // Generate UPDATE_BEFORE + output.setRowKind(RowKind.UPDATE_BEFORE); + output.replace(curRow, prevFunction.getValue()); + out.collect(output); + // Generate UPDATE_AFTER + output.setRowKind(RowKind.UPDATE_AFTER); + output.replace(curRow, currFunction.getValue()); + out.collect(output); + } + } + + private void removeFromSortedList(RowData rowData, Collector out) throws Exception { + boolean isRetracted = false; + rowData.setRowKind(RowKind.INSERT); + List rowList = inputState.value(); + ListIterator iterator = rowList.listIterator(); + + while (iterator.hasNext()) { + RowData curRow = iterator.next(); + currFunction.accumulate(curRow); + prevFunction.accumulate(curRow); + if (isRetracted) { + // Emit updated agg value for all records after retraction + output.setRowKind(RowKind.UPDATE_BEFORE); + output.replace(curRow, prevFunction.getValue()); + out.collect(output); + + output.setRowKind(RowKind.UPDATE_AFTER); + output.replace(curRow, currFunction.getValue()); + out.collect(output); + } else if (equaliser.equals(curRow, rowData)) { + // Retract record + output.setRowKind(RowKind.UPDATE_BEFORE); + output.replace(rowData, currFunction.getValue()); + out.collect(output); + iterator.remove(); + currFunction.retract(curRow); + isRetracted = true; + } + } + + // Update state without the retracted row + inputState.update(rowList); + } + + @Override + public void close() throws Exception { + if (null != currFunction) { + currFunction.close(); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/TimeAttribute.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/TimeAttribute.java new file mode 100644 index 0000000000000..34522bfe63560 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/TimeAttribute.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.over; + +/** Type of TimeAttribute supported by OverAggregate operator. */ +public enum TimeAttribute { + ROW_TIME, + PROC_TIME, + NON_TIME +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeUnboundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeUnboundedPrecedingFunctionTest.java new file mode 100644 index 0000000000000..9092d69061925 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/NonTimeUnboundedPrecedingFunctionTest.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.over; + +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.types.RowKind; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link NonTimeUnboundedPrecedingFunction}. */ +public class NonTimeUnboundedPrecedingFunctionTest extends RowTimeOverWindowTestBase { + + private static final GeneratedRecordComparator GENERATED_SORT_KEY_COMPARATOR = + new GeneratedRecordComparator("", "", new Object[0]) { + + private static final long serialVersionUID = 1L; + + @Override + public RecordComparator newInstance(ClassLoader classLoader) { + return new LongRecordComparator(1); + } + }; + + private static final GeneratedRecordEqualiser GENERATED_RECORD_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + + private static final long serialVersionUID = 1L; + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + // return new RowDataRecordEqualiser(); + return new TestRecordEqualiser(); + } + }; + + /** Custom test comparator for comparing numbers. */ + public static class LongRecordComparator implements RecordComparator { + + private int pos; + + public LongRecordComparator(int pos) { + this.pos = pos; + } + + @Override + public int compare(RowData o1, RowData o2) { + boolean null0At1 = o1.isNullAt(pos); + boolean null0At2 = o2.isNullAt(pos); + int cmp0 = + null0At1 && null0At2 + ? 0 + : (null0At1 + ? -1 + : (null0At2 + ? 1 + : Long.compare(o1.getLong(pos), o2.getLong(pos)))); + if (cmp0 != 0) { + return cmp0; + } + return 0; + } + } + + /** Custom test row equaliser for comparing rows. */ + public static class TestRecordEqualiser implements RecordEqualiser { + + private static final long serialVersionUID = -6706336100425614942L; + + @Override + public boolean equals(RowData row1, RowData row2) { + if (row1 instanceof BinaryRowData && row2 instanceof BinaryRowData) { + return row1.equals(row2); + } else if (row1 instanceof GenericRowData && row2 instanceof GenericRowData) { + return row1.getString(0).equals(row2.getString(0)) + && row1.getLong(1) == row2.getLong(1) + && row1.getLong(2) == row2.getLong(2); + } else { + throw new UnsupportedOperationException(); + } + } + } + + @Test + public void testInsertOnlyRecordsWithCustomSortKey() throws Exception { + NonTimeUnboundedPrecedingFunction function = + new NonTimeUnboundedPrecedingFunction( + 0, + 2000, + aggsHandleFunction, + GENERATED_RECORD_EQUALISER, + GENERATED_SORT_KEY_COMPARATOR, + accTypes, + inputFieldTypes) {}; + KeyedProcessOperator operator = + new KeyedProcessOperator<>(function); + + OneInputStreamOperatorTestHarness testHarness = + createTestHarness(operator); + + testHarness.open(); + + // put some records + testHarness.processElement(insertRecord("key1", 1L, 100L)); + testHarness.processElement(insertRecord("key1", 2L, 200L)); + testHarness.processElement(insertRecord("key1", 5L, 500L)); + testHarness.processElement(insertRecord("key1", 6L, 600L)); + testHarness.processElement(insertRecord("key2", 1L, 100L)); + testHarness.processElement(insertRecord("key2", 2L, 200L)); + + testHarness.processWatermark(new Watermark(500L)); + + // out of order record should trigger updates for all records after its inserted position + testHarness.processElement(insertRecord("key1", 4L, 400L)); + + List expectedRows = + Arrays.asList( + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 1L, 100L), + GenericRowData.ofKind(RowKind.INSERT, 1L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 2L, 200L), + GenericRowData.ofKind(RowKind.INSERT, 3L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 8L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 14L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key2"), 1L, 100L), + GenericRowData.ofKind(RowKind.INSERT, 1L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key2"), 2L, 200L), + GenericRowData.ofKind(RowKind.INSERT, 3L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 4L, 400L), + GenericRowData.ofKind(RowKind.INSERT, 7L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.UPDATE_BEFORE, + StringData.fromString("key1"), + 5L, + 500L), + GenericRowData.ofKind(RowKind.INSERT, 8L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.UPDATE_AFTER, + StringData.fromString("key1"), + 5L, + 500L), + GenericRowData.ofKind(RowKind.INSERT, 12L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.UPDATE_BEFORE, + StringData.fromString("key1"), + 6L, + 600L), + GenericRowData.ofKind(RowKind.INSERT, 14L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.UPDATE_AFTER, + StringData.fromString("key1"), + 6L, + 600L), + GenericRowData.ofKind(RowKind.INSERT, 18L))); + + List actualRows = testHarness.extractOutputValues(); + + validateRows(actualRows, expectedRows); + } + + @Test + public void testRetractingRecordsWithCustomSortKey() throws Exception { + NonTimeUnboundedPrecedingFunction function = + new NonTimeUnboundedPrecedingFunction( + 0, + 2000, + aggsHandleFunction, + GENERATED_RECORD_EQUALISER, + GENERATED_SORT_KEY_COMPARATOR, + accTypes, + inputFieldTypes) {}; + KeyedProcessOperator operator = + new KeyedProcessOperator<>(function); + + OneInputStreamOperatorTestHarness testHarness = + createTestHarness(operator); + + testHarness.open(); + + // put some records + testHarness.processElement(insertRecord("key1", 1L, 100L)); + testHarness.processElement(insertRecord("key1", 2L, 200L)); + testHarness.processElement(insertRecord("key1", 5L, 500L)); + testHarness.processElement(insertRecord("key1", 6L, 600L)); + testHarness.processElement(updateBeforeRecord("key1", 2L, 200L)); + testHarness.processElement(updateAfterRecord("key1", 3L, 200L)); + testHarness.processElement(insertRecord("key2", 1L, 100L)); + testHarness.processElement(insertRecord("key2", 2L, 200L)); + testHarness.processElement(insertRecord("key3", 1L, 100L)); + testHarness.processElement(insertRecord("key1", 4L, 400L)); + testHarness.processElement(updateBeforeRecord("key1", 3L, 200L)); + testHarness.processElement(updateAfterRecord("key1", 3L, 300L)); + + // Watermark has no impact and should be ignored + testHarness.processWatermark(new Watermark(500L)); + + List expectedRows = + Arrays.asList( + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 1L, 100L), + GenericRowData.ofKind(RowKind.INSERT, 1L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 2L, 200L), + GenericRowData.ofKind(RowKind.INSERT, 3L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 8L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 14L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 2L, 200L), + GenericRowData.ofKind(RowKind.INSERT, 3L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 8L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 6L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 14L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 12L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.UPDATE_AFTER, + StringData.fromString("key1"), + 3L, + 200L), + GenericRowData.ofKind(RowKind.INSERT, 4L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 6L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 9L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 12L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 15L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key2"), 1L, 100L), + GenericRowData.ofKind(RowKind.INSERT, 1L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key2"), 2L, 200L), + GenericRowData.ofKind(RowKind.INSERT, 3L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key3"), 1L, 100L), + GenericRowData.ofKind(RowKind.INSERT, 1L)), + new JoinedRowData( + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 4L, 400L), + GenericRowData.ofKind(RowKind.INSERT, 8L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 9L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 13L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 15L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 19L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 3L, 200L), + GenericRowData.ofKind(RowKind.INSERT, 4L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 4L, 400L), + GenericRowData.ofKind(RowKind.INSERT, 8L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 4L, 400L), + GenericRowData.ofKind(RowKind.INSERT, 5L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 13L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 10L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 19L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 16L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.UPDATE_AFTER, + StringData.fromString("key1"), + 3L, + 300L), + GenericRowData.ofKind(RowKind.INSERT, 4L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 4L, 400L), + GenericRowData.ofKind(RowKind.INSERT, 5L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 4L, 400L), + GenericRowData.ofKind(RowKind.INSERT, 8L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 10L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 5L, 500L), + GenericRowData.ofKind(RowKind.INSERT, 13L)), + new JoinedRowData( + RowKind.UPDATE_BEFORE, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 16L)), + new JoinedRowData( + RowKind.UPDATE_AFTER, + GenericRowData.ofKind( + RowKind.INSERT, StringData.fromString("key1"), 6L, 600L), + GenericRowData.ofKind(RowKind.INSERT, 19L))); + + List actualRows = testHarness.extractOutputValues(); + + validateRows(actualRows, expectedRows); + } + + private void validateRows(List actualRows, List expectedRows) { + // Validate size of rows emitted + assertThat(actualRows.size()).isEqualTo(expectedRows.size()); + + // Validate the contents of rows emitted + for (int i = 0; i < actualRows.size(); i++) { + assertThat(actualRows.get(i).getRowKind()).isEqualTo(expectedRows.get(i).getRowKind()); + assertThat(actualRows.get(i).getString(0)).isEqualTo(expectedRows.get(i).getString(0)); + assertThat(actualRows.get(i).getLong(1)).isEqualTo(expectedRows.get(i).getLong(1)); + assertThat(actualRows.get(i).getLong(2)).isEqualTo(expectedRows.get(i).getLong(2)); + // Aggregated value + assertThat(actualRows.get(i).getLong(3)).isEqualTo(expectedRows.get(i).getLong(3)); + } + } +}