From 4ff9ab5fe08df488fa3fa9f9ae962df5beb1c3f9 Mon Sep 17 00:00:00 2001 From: fredia Date: Mon, 16 Dec 2024 16:48:22 +0800 Subject: [PATCH] [FLINK-35904][test] Test harness for async state processing operators --- .../AbstractAsyncStateStreamOperatorTest.java | 199 +++++++------ ...edMultiInputStreamOperatorTestHarness.java | 119 ++++++++ ...eyedOneInputStreamOperatorTestHarness.java | 186 ++++++++++++ ...eyedTwoInputStreamOperatorTestHarness.java | 150 ++++++++++ ...bstractAsyncStateStreamOperatorV2Test.java | 274 +++++++++++++----- 5 files changed, 767 insertions(+), 161 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java index 4cda2f323020e..a87a3a8bf323c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.StateRequestType; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; @@ -33,19 +32,22 @@ import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.Test; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend; @@ -54,14 +56,14 @@ /** Basic tests for {@link AbstractAsyncStateStreamOperator}. */ public class AbstractAsyncStateStreamOperatorTest { - protected KeyedOneInputStreamOperatorTestHarness, String> + protected AsyncKeyedOneInputStreamOperatorTestHarness, String> createTestHarness( int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception { TestOperator testOperator = new TestOperator(elementOrder); - KeyedOneInputStreamOperatorTestHarness, String> + AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + new AsyncKeyedOneInputStreamOperatorTestHarness<>( testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, @@ -74,7 +76,7 @@ public class AbstractAsyncStateStreamOperatorTest { @Test void testCreateAsyncExecutionController() throws Exception { - try (KeyedOneInputStreamOperatorTestHarness, String> + try (AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); assertThat(testHarness.getOperator()) @@ -93,21 +95,12 @@ void testCreateAsyncExecutionController() throws Exception { @Test void testRecordProcessorWithFirstStateOrder() throws Exception { - try (KeyedOneInputStreamOperatorTestHarness, String> + try (AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER)) { testHarness.open(); TestOperator testOperator = (TestOperator) testHarness.getOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - // Trigger the processor - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - } catch (Exception e) { - } - }); + CompletableFuture future = + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(1); @@ -115,29 +108,19 @@ void testRecordProcessorWithFirstStateOrder() throws Exception { // Proceed processing testOperator.proceed(); - anotherThread.shutdown(); - Thread.sleep(1000); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); } } @Test void testRecordProcessorWithRecordOrder() throws Exception { - try (KeyedOneInputStreamOperatorTestHarness, String> + try (AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); TestOperator testOperator = (TestOperator) testHarness.getOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - // Trigger the processor - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - } catch (Exception e) { - } - }); + CompletableFuture future = + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(1); @@ -147,8 +130,7 @@ void testRecordProcessorWithRecordOrder() throws Exception { // Proceed processing testOperator.proceed(); - anotherThread.shutdown(); - Thread.sleep(1000); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); } } @@ -157,9 +139,9 @@ void testRecordProcessorWithRecordOrder() throws Exception { void testAsyncProcessWithKey() throws Exception { TestOperatorWithDirectAsyncProcess testOperator = new TestOperatorWithDirectAsyncProcess(ElementOrder.RECORD_ORDER); - KeyedOneInputStreamOperatorTestHarness, String> + AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( + new AsyncKeyedOneInputStreamOperatorTestHarness<>( testOperator, new TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, @@ -171,15 +153,8 @@ void testAsyncProcessWithKey() throws Exception { testHarness.open(); ThrowingConsumer>, Exception> processor = RecordProcessorUtils.getRecordProcessor(testOperator); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - // Trigger the processor - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - } catch (Exception e) { - } - }); + CompletableFuture future = + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(0); @@ -189,8 +164,7 @@ void testAsyncProcessWithKey() throws Exception { // Proceed processing testOperator.proceed(); - anotherThread.shutdown(); - Thread.sleep(1000); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); // We don't have the mailbox executor actually running, so the new context is blocked @@ -203,11 +177,9 @@ void testAsyncProcessWithKey() throws Exception { @Test void testCheckpointDrain() throws Exception { - try (KeyedOneInputStreamOperatorTestHarness, String> + try (AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); - CheckpointStorageLocationReference locationReference = - CheckpointStorageLocationReference.getDefault(); AsyncExecutionController asyncExecutionController = ((AbstractAsyncStateStreamOperator) testHarness.getOperator()) .getAsyncExecutionController(); @@ -225,7 +197,7 @@ void testCheckpointDrain() throws Exception { @Test void testTimerServiceIsAsync() throws Exception { - try (KeyedOneInputStreamOperatorTestHarness, String> + try (AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); assertThat(testHarness.getOperator()) @@ -249,22 +221,13 @@ public void onProcessingTime(InternalTimer timer) throws Exception {} @Test void testNonRecordProcess() throws Exception { - try (KeyedOneInputStreamOperatorTestHarness, String> + try (AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); TestOperator testOperator = (TestOperator) testHarness.getOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - testOperator.processLatencyMarker( - new LatencyMarker(1234, new OperatorID(), 0)); - } catch (Exception e) { - } - }); + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); + CompletableFuture future = + testHarness.processLatencyMarker(new LatencyMarker(1234, new OperatorID(), 0)); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(1); @@ -274,8 +237,7 @@ void testNonRecordProcess() throws Exception { // Proceed processing testOperator.proceed(); - anotherThread.shutdown(); - Thread.sleep(1000); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); assertThat(testOperator.getLatencyProcessed()).isEqualTo(1); } @@ -283,23 +245,16 @@ void testNonRecordProcess() throws Exception { @Test void testWatermarkStatus() throws Exception { - try (KeyedOneInputStreamOperatorTestHarness, String> + try (AsyncKeyedOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); TestOperator testOperator = (TestOperator) testHarness.getOperator(); ThrowingConsumer>, Exception> processor = RecordProcessorUtils.getRecordProcessor(testOperator); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - testOperator.processWatermark1(new Watermark(205L)); - testOperator.processWatermark2(new Watermark(105L)); - testOperator.processWatermarkStatus(WatermarkStatus.IDLE, 1); - } catch (Exception e) { - } - }); + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); + testHarness.processWatermark(new Watermark(205L)); + CompletableFuture future = + testHarness.processWatermarkStatus(WatermarkStatus.IDLE); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(1); @@ -310,15 +265,57 @@ void testWatermarkStatus() throws Exception { // Proceed processing testOperator.proceed(); - anotherThread.shutdown(); - Thread.sleep(1000); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); assertThat(testOperator.watermarkStatus.isActive()).isFalse(); assertThat(testHarness.getOutput()) .containsExactly( new StreamRecord<>("EventTimer-5-105"), - new Watermark(105L), - new Watermark(205L)); + new Watermark(205L), + WatermarkStatus.IDLE); + } + } + + @Test + void testIdleWatermarkHandling() throws Exception { + final WatermarkTestingOperator testOperator = new WatermarkTestingOperator(); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + KeySelector dummyKeySelector = l -> 0; + try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = + new AsyncKeyedTwoInputStreamOperatorTestHarness<>( + testOperator, + dummyKeySelector, + dummyKeySelector, + BasicTypeInfo.INT_TYPE_INFO, + 1, + 1, + 0)) { + testHarness.setup(); + testHarness.open(); + testHarness.processElement1(1L, 1L).get(); + testHarness.processElement1(3L, 3L).get(); + testHarness.processElement1(4L, 4L).get(); + testHarness.processWatermark1(new Watermark(1L)).get(); + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.processWatermarkStatus2(WatermarkStatus.IDLE).get(); + expectedOutput.add(new StreamRecord<>(1L)); + expectedOutput.add(new Watermark(1L)); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark1(new Watermark(3L)).get(); + expectedOutput.add(new StreamRecord<>(3L)); + expectedOutput.add(new Watermark(3L)); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermarkStatus2(WatermarkStatus.ACTIVE).get(); + // the other input is active now, we should not emit the watermark + testHarness.processWatermark1(new Watermark(4L)); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); } } @@ -432,6 +429,40 @@ public void processElement(StreamRecord> element) throws } } + private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOperator + implements TwoInputStreamOperator, + Triggerable { + + private transient InternalTimerService timerService; + + @Override + public void open() throws Exception { + super.open(); + + this.timerService = + getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + output.collect(new StreamRecord<>(timer.getTimestamp())); + } + + @Override + public void onProcessingTime(InternalTimer timer) + throws Exception {} + + @Override + public void processElement1(StreamRecord element) throws Exception { + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue()); + } + + @Override + public void processElement2(StreamRecord element) throws Exception { + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue()); + } + } + /** {@link KeySelector} for tests. */ public static class TestKeySelector implements KeySelector, Integer> { private static final long serialVersionUID = 1L; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java new file mode 100644 index 0000000000000..2ec26fcc9d54d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.asyncprocessing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A test harness for testing a {@link MultipleInputStreamOperator}. + * + *

All methods that interact with the operator need to be executed in another thread to simulate + * async processing. + */ +public class AsyncKeyedMultiInputStreamOperatorTestHarness + extends AbstractStreamOperatorTestHarness { + + /** The executor service for async state processing. */ + private ExecutorService executor; + + public AsyncKeyedMultiInputStreamOperatorTestHarness( + StreamOperatorFactory operatorFactory, + TypeInformation keyType, + List> keySelectors, + int maxParallelism, + int numSubtasks, + int subtaskIndex) + throws Exception { + super(operatorFactory, maxParallelism, numSubtasks, subtaskIndex); + config.setStateKeySerializer( + keyType.createSerializer(executionConfig.getSerializerConfig())); + config.serializeAllConfigs(); + for (int i = 0; i < keySelectors.size(); i++) { + setKeySelector(i, keySelectors.get(i)); + } + this.executor = Executors.newSingleThreadExecutor(); + } + + public void setKeySelector(int idx, KeySelector keySelector) { + ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); + config.setStatePartitioner(idx, keySelector); + config.serializeAllConfigs(); + } + + public CompletableFuture processElement(int idx, StreamRecord element) + throws Exception { + Input input = getCastedOperator().getInputs().get(idx); + ThrowingConsumer, Exception> inputProcessor = + RecordProcessorUtils.getRecordProcessor(input); + return execute((ignore) -> inputProcessor.accept(element)); + } + + public CompletableFuture processWatermark(int idx, Watermark mark) throws Exception { + Input input = getCastedOperator().getInputs().get(idx); + return execute((ignore) -> input.processWatermark(mark)); + } + + public CompletableFuture processWatermarkStatus(int idx, WatermarkStatus watermarkStatus) + throws Exception { + Input input = getCastedOperator().getInputs().get(idx); + return execute((ignore) -> input.processWatermarkStatus(watermarkStatus)); + } + + public CompletableFuture processRecordAttributes( + int idx, RecordAttributes recordAttributes) throws Exception { + Input input = getCastedOperator().getInputs().get(idx); + return execute((ignore) -> input.processRecordAttributes(recordAttributes)); + } + + private MultipleInputStreamOperator getCastedOperator() { + return (MultipleInputStreamOperator) operator; + } + + private CompletableFuture execute(ThrowingConsumer processor) { + CompletableFuture future = new CompletableFuture(); + executor.execute( + () -> { + try { + processor.accept(null); + future.complete(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return future; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java new file mode 100644 index 0000000000000..649d8f76318e2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.asyncprocessing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A test harness for testing a {@link OneInputStreamOperator} which uses async state. + * + *

All methods that interact with the operator need to be executed in another thread to simulate + * async processing. + */ +public class AsyncKeyedOneInputStreamOperatorTestHarness + extends AbstractStreamOperatorTestHarness { + + /** Empty if the {@link #operator} is not {@link MultipleInputStreamOperator}. */ + private final List inputs = new ArrayList<>(); + + /** The executor service for async state processing. */ + private ExecutorService executor; + + public AsyncKeyedOneInputStreamOperatorTestHarness( + OneInputStreamOperator operator, + final KeySelector keySelector, + TypeInformation keyType, + int maxParallelism, + int numSubtasks, + int subtaskIndex) + throws Exception { + this( + SimpleOperatorFactory.of(operator), + keySelector, + keyType, + maxParallelism, + numSubtasks, + subtaskIndex); + } + + public AsyncKeyedOneInputStreamOperatorTestHarness( + StreamOperatorFactory operatorFactory, + final KeySelector keySelector, + TypeInformation keyType, + int maxParallelism, + int numSubtasks, + int subtaskIndex) + throws Exception { + super(operatorFactory, maxParallelism, numSubtasks, subtaskIndex); + + ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); + config.setStatePartitioner(0, keySelector); + config.setStateKeySerializer( + keyType.createSerializer(executionConfig.getSerializerConfig())); + config.serializeAllConfigs(); + executor = Executors.newSingleThreadExecutor(); + } + + @Override + public void setup(TypeSerializer outputSerializer) { + super.setup(outputSerializer); + if (operator instanceof MultipleInputStreamOperator) { + checkState(inputs.isEmpty()); + inputs.addAll(((MultipleInputStreamOperator) operator).getInputs()); + } + } + + public OneInputStreamOperator getOneInputOperator() { + return (OneInputStreamOperator) this.operator; + } + + public CompletableFuture processElement(StreamRecord element) throws Exception { + if (inputs.isEmpty()) { + return execute( + (ignore) -> + RecordProcessorUtils.getRecordProcessor(getOneInputOperator()) + .accept(element)); + } else { + checkState(inputs.size() == 1); + Input input = inputs.get(0); + return execute( + (ignore) -> + ((ThrowingConsumer) + RecordProcessorUtils.getRecordProcessor(input)) + .accept(element)); + } + } + + public CompletableFuture processWatermark(long watermark) throws Exception { + return processWatermark(new Watermark(watermark)); + } + + public CompletableFuture processWatermarkStatus(WatermarkStatus status) throws Exception { + if (inputs.isEmpty()) { + return execute((ignore) -> getOneInputOperator().processWatermarkStatus(status)); + } else { + checkState(inputs.size() == 1); + Input input = inputs.get(0); + return execute((ignore) -> input.processWatermarkStatus(status)); + } + } + + public CompletableFuture processWatermark(Watermark mark) throws Exception { + if (inputs.isEmpty()) { + return execute((ignore) -> getOneInputOperator().processWatermark(mark)); + } else { + checkState(inputs.size() == 1); + Input input = inputs.get(0); + return execute((ignore) -> input.processWatermark(mark)); + } + } + + public CompletableFuture processLatencyMarker(LatencyMarker marker) { + if (inputs.isEmpty()) { + return execute((ignore) -> getOneInputOperator().processLatencyMarker(marker)); + } else { + checkState(inputs.size() == 1); + Input input = inputs.get(0); + return execute((ignore) -> input.processLatencyMarker(marker)); + } + } + + public CompletableFuture processRecordAttributes(RecordAttributes recordAttributes) { + if (inputs.isEmpty()) { + return execute( + (ignore) -> getOneInputOperator().processRecordAttributes(recordAttributes)); + } else { + checkState(inputs.size() == 1); + Input input = inputs.get(0); + return execute((ignore) -> input.processRecordAttributes(recordAttributes)); + } + } + + private CompletableFuture execute(ThrowingConsumer processor) { + CompletableFuture future = new CompletableFuture(); + executor.execute( + () -> { + try { + processor.accept(null); + future.complete(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return future; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java new file mode 100644 index 0000000000000..63f214c0f2bfb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.asyncprocessing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A test harness for testing a {@link OneInputStreamOperator} which uses async state. + * + *

All methods that interact with the operator need to be executed in another thread to simulate + * async processing. + */ +public class AsyncKeyedTwoInputStreamOperatorTestHarness + extends AbstractStreamOperatorTestHarness { + + private final TwoInputStreamOperator twoInputOperator; + + private ThrowingConsumer, Exception> processor1; + private ThrowingConsumer, Exception> processor2; + + /** The executor service for async state processing. */ + private ExecutorService executor; + + public AsyncKeyedTwoInputStreamOperatorTestHarness( + TwoInputStreamOperator operator, + KeySelector keySelector1, + KeySelector keySelector2, + TypeInformation keyType, + int maxParallelism, + int numSubtasks, + int subtaskIndex) + throws Exception { + super(operator, maxParallelism, numSubtasks, subtaskIndex); + + ClosureCleaner.clean(keySelector1, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); + ClosureCleaner.clean(keySelector2, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); + config.setStatePartitioner(0, keySelector1); + config.setStatePartitioner(1, keySelector2); + config.setStateKeySerializer( + keyType.createSerializer(executionConfig.getSerializerConfig())); + config.serializeAllConfigs(); + + this.twoInputOperator = operator; + this.executor = Executors.newSingleThreadExecutor(); + } + + private ThrowingConsumer, Exception> getRecordProcessor1() { + if (processor1 == null) { + processor1 = RecordProcessorUtils.getRecordProcessor1(twoInputOperator); + } + return processor1; + } + + private ThrowingConsumer, Exception> getRecordProcessor2() { + if (processor2 == null) { + processor2 = RecordProcessorUtils.getRecordProcessor2(twoInputOperator); + } + return processor2; + } + + public CompletableFuture processElement1(StreamRecord element) throws Exception { + return execute((ignore) -> getRecordProcessor1().accept(element)); + } + + public CompletableFuture processElement1(IN1 value, long timestamp) throws Exception { + return processElement1(new StreamRecord<>(value, timestamp)); + } + + public CompletableFuture processElement2(StreamRecord element) throws Exception { + return execute((ignore) -> getRecordProcessor2().accept(element)); + } + + public CompletableFuture processElement2(IN2 value, long timestamp) throws Exception { + return processElement2(new StreamRecord<>(value, timestamp)); + } + + public CompletableFuture processWatermark1(Watermark mark) throws Exception { + return execute((ignore) -> twoInputOperator.processWatermark1(mark)); + } + + public CompletableFuture processWatermark2(Watermark mark) throws Exception { + return execute((ignore) -> twoInputOperator.processWatermark2(mark)); + } + + public CompletableFuture processWatermarkStatus1(WatermarkStatus watermarkStatus) + throws Exception { + return execute((ignore) -> twoInputOperator.processWatermarkStatus1(watermarkStatus)); + } + + public CompletableFuture processWatermarkStatus2(WatermarkStatus watermarkStatus) + throws Exception { + return execute((ignore) -> twoInputOperator.processWatermarkStatus2(watermarkStatus)); + } + + public CompletableFuture processRecordAttributes1(RecordAttributes recordAttributes) + throws Exception { + return execute((ignore) -> twoInputOperator.processRecordAttributes1(recordAttributes)); + } + + public CompletableFuture processRecordAttributes2(RecordAttributes recordAttributes) + throws Exception { + return execute((ignore) -> twoInputOperator.processRecordAttributes2(recordAttributes)); + } + + private CompletableFuture execute(ThrowingConsumer processor) { + CompletableFuture future = new CompletableFuture(); + executor.execute( + () -> { + try { + processor.accept(null); + future.complete(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return future; + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java index 719987fe72bec..18f79d6d2be5b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java @@ -31,20 +31,22 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; +import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedMultiInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend; @@ -96,17 +98,8 @@ void testRecordProcessorWithFirstStateOrder() throws Exception { testHarness.open(); SingleInputTestOperator testOperator = (SingleInputTestOperator) testHarness.getBaseOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator.getInputs().get(0)); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - // Trigger the processor - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - } catch (Exception e) { - } - }); + CompletableFuture future = + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(1); @@ -114,8 +107,7 @@ void testRecordProcessorWithFirstStateOrder() throws Exception { // Proceed processing testOperator.proceed(); - anotherThread.shutdown(); - Thread.sleep(1000); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); } } @@ -127,17 +119,8 @@ void testRecordProcessorWithRecordOrder() throws Exception { testHarness.open(); SingleInputTestOperator testOperator = (SingleInputTestOperator) testHarness.getBaseOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator.getInputs().get(0)); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - // Trigger the processor - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - } catch (Exception e) { - } - }); + CompletableFuture future = + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(1); @@ -147,8 +130,7 @@ void testRecordProcessorWithRecordOrder() throws Exception { // Proceed processing testOperator.proceed(); - anotherThread.shutdown(); - Thread.sleep(1000); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); } } @@ -170,17 +152,8 @@ void testAsyncProcessWithKey() throws Exception { testHarness.open(); SingleInputTestOperatorDirectAsyncProcess testOperator = (SingleInputTestOperatorDirectAsyncProcess) testHarness.getBaseOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator.getInputs().get(0)); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - // Trigger the processor - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - } catch (Exception e) { - } - }); + CompletableFuture future = + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(0); @@ -190,8 +163,7 @@ void testAsyncProcessWithKey() throws Exception { // Proceed processing testOperator.proceed(); - anotherThread.shutdown(); - Thread.sleep(1000); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); // We don't have the mailbox executor actually running, so the new context is blocked @@ -253,18 +225,9 @@ void testNonRecordProcess() throws Exception { testHarness.open(); SingleInputTestOperator testOperator = (SingleInputTestOperator) testHarness.getBaseOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator.getInputs().get(0)); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - // Trigger the processor - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - testOperator.processRecordAttributes(new RecordAttributes(false), 1); - } catch (Exception e) { - } - }); + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); + CompletableFuture future = + testHarness.processRecordAttributes(new RecordAttributes(false)); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(1); @@ -272,8 +235,7 @@ void testNonRecordProcess() throws Exception { // Proceed processing testOperator.proceed(); - Thread.sleep(3000); - anotherThread.shutdown(); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); assertThat(testOperator.getAttributeProcessed()).isEqualTo(1); } @@ -286,22 +248,10 @@ void testWatermarkStatus() throws Exception { testHarness.open(); SingleInputTestOperator testOperator = (SingleInputTestOperator) testHarness.getBaseOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator.getInputs().get(0)); - ExecutorService anotherThread = Executors.newSingleThreadExecutor(); - // Trigger the processor - anotherThread.execute( - () -> { - try { - processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); - testOperator.getInputs().get(0).processWatermark(new Watermark(205L)); - testOperator - .getInputs() - .get(0) - .processWatermarkStatus(WatermarkStatus.IDLE); - } catch (Exception e) { - } - }); + testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"))); + testHarness.processWatermark(new Watermark(205L)); + CompletableFuture future = + testHarness.processWatermarkStatus(WatermarkStatus.IDLE); Thread.sleep(1000); assertThat(testOperator.getProcessed()).isEqualTo(1); @@ -310,8 +260,7 @@ void testWatermarkStatus() throws Exception { // Proceed processing testOperator.proceed(); - Thread.sleep(3000); - anotherThread.shutdown(); + future.get(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); assertThat(testOperator.watermarkStatus.isActive()).isFalse(); assertThat(testHarness.getOutput()) @@ -322,8 +271,124 @@ void testWatermarkStatus() throws Exception { } } + @Test + void testIdleWatermarkHandling() throws Exception { + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + KeySelector dummyKeySelector = l -> 0; + List> keySelectors = + Arrays.asList(dummyKeySelector, dummyKeySelector, dummyKeySelector); + try (AsyncKeyedMultiInputStreamOperatorTestHarness testHarness = + new AsyncKeyedMultiInputStreamOperatorTestHarness<>( + new WatermarkTestingOperatorFactory(), + BasicTypeInfo.INT_TYPE_INFO, + keySelectors, + 1, + 1, + 0)) { + testHarness.setup(); + testHarness.open(); + testHarness.processElement(0, new StreamRecord<>(1L, 1L)).get(); + testHarness.processElement(0, new StreamRecord<>(3L, 3L)).get(); + testHarness.processElement(0, new StreamRecord<>(4L, 4L)).get(); + testHarness.processWatermark(0, new Watermark(1L)); + assertThat(testHarness.getOutput()).isEmpty(); + + testHarness.processWatermarkStatus(1, WatermarkStatus.IDLE); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + testHarness.processWatermarkStatus(2, WatermarkStatus.IDLE).get(); + expectedOutput.add(new StreamRecord<>(1L)); + expectedOutput.add(new Watermark(1L)); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(0, new Watermark(3L)).get(); + expectedOutput.add(new StreamRecord<>(3L)); + expectedOutput.add(new Watermark(3L)); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermarkStatus(1, WatermarkStatus.ACTIVE).get(); + // the other input is active now, we should not emit the watermark + testHarness.processWatermark(0, new Watermark(4L)).get(); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + } + } + + @Test + void testIdlenessForwarding() throws Exception { + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + KeySelector dummyKeySelector = l -> 0; + List> keySelectors = + Arrays.asList(dummyKeySelector, dummyKeySelector, dummyKeySelector); + try (AsyncKeyedMultiInputStreamOperatorTestHarness testHarness = + new AsyncKeyedMultiInputStreamOperatorTestHarness<>( + new WatermarkTestingOperatorFactory(), + BasicTypeInfo.INT_TYPE_INFO, + keySelectors, + 1, + 1, + 0)) { + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermarkStatus(0, WatermarkStatus.IDLE).get(); + testHarness.processWatermarkStatus(1, WatermarkStatus.IDLE).get(); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + testHarness.processWatermarkStatus(2, WatermarkStatus.IDLE).get(); + expectedOutput.add(WatermarkStatus.IDLE); + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + } + } + + @Test + void testRecordAttributesForwarding() throws Exception { + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + KeySelector dummyKeySelector = l -> 0; + List> keySelectors = + Arrays.asList(dummyKeySelector, dummyKeySelector, dummyKeySelector); + try (AsyncKeyedMultiInputStreamOperatorTestHarness testHarness = + new AsyncKeyedMultiInputStreamOperatorTestHarness<>( + new WatermarkTestingOperatorFactory(), + BasicTypeInfo.INT_TYPE_INFO, + keySelectors, + 1, + 1, + 0)) { + + testHarness.setup(); + testHarness.open(); + + final RecordAttributes backlogRecordAttributes = + new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build(); + final RecordAttributes nonBacklogRecordAttributes = + new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build(); + + testHarness.processRecordAttributes(0, backlogRecordAttributes).get(); + testHarness.processRecordAttributes(1, backlogRecordAttributes).get(); + testHarness.processRecordAttributes(2, backlogRecordAttributes).get(); + expectedOutput.add(backlogRecordAttributes); + expectedOutput.add(backlogRecordAttributes); + expectedOutput.add(backlogRecordAttributes); + + testHarness.processRecordAttributes(0, nonBacklogRecordAttributes).get(); + testHarness.processRecordAttributes(1, nonBacklogRecordAttributes).get(); + testHarness.processRecordAttributes(2, nonBacklogRecordAttributes).get(); + expectedOutput.add(backlogRecordAttributes); + expectedOutput.add(backlogRecordAttributes); + expectedOutput.add(nonBacklogRecordAttributes); + + TestHarnessUtil.assertOutputEquals( + "Output was not correct", expectedOutput, testHarness.getOutput()); + } + } + private static class KeyedOneInputStreamOperatorV2TestHarness - extends KeyedOneInputStreamOperatorTestHarness { + extends AsyncKeyedOneInputStreamOperatorTestHarness { public KeyedOneInputStreamOperatorV2TestHarness( StreamOperatorFactory operatorFactory, final KeySelector keySelector, @@ -508,4 +573,59 @@ public void processElement(StreamRecord> element) }; } } + + private static class WatermarkTestingOperatorFactory + extends AbstractStreamOperatorFactory { + @Override + public > T createStreamOperator( + StreamOperatorParameters parameters) { + return (T) new WatermarkTestingOperator(parameters); + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return WatermarkTestingOperator.class; + } + } + + private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOperatorV2 + implements MultipleInputStreamOperator, Triggerable { + + private transient InternalTimerService timerService; + + public WatermarkTestingOperator(StreamOperatorParameters parameters) { + super(parameters, 3); + } + + @Override + public void open() throws Exception { + super.open(); + + this.timerService = + getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + output.collect(new StreamRecord<>(timer.getTimestamp())); + } + + @Override + public void onProcessingTime(InternalTimer timer) + throws Exception {} + + private Input createInput(int idx) { + return new AbstractInput(this, idx) { + @Override + public void processElement(StreamRecord element) throws Exception { + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue()); + } + }; + } + + @Override + public List getInputs() { + return Arrays.asList(createInput(1), createInput(2), createInput(3)); + } + } }