Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35904][test] Test harness for async state processing operators #25802

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
Expand All @@ -33,19 +32,22 @@
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;

import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncStateBackend;
Expand All @@ -54,14 +56,14 @@
/** Basic tests for {@link AbstractAsyncStateStreamOperator}. */
public class AbstractAsyncStateStreamOperatorTest {

protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
protected AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
createTestHarness(
int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder)
throws Exception {
TestOperator testOperator = new TestOperator(elementOrder);
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
new AsyncKeyedOneInputStreamOperatorTestHarness<>(
testOperator,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
Expand All @@ -74,7 +76,7 @@ public class AbstractAsyncStateStreamOperatorTest {

@Test
void testCreateAsyncExecutionController() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
assertThat(testHarness.getOperator())
Expand All @@ -93,51 +95,32 @@ void testCreateAsyncExecutionController() throws Exception {

@Test
void testRecordProcessorWithFirstStateOrder() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER)) {
testHarness.open();
TestOperator testOperator = (TestOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
ExecutorService anotherThread = Executors.newSingleThreadExecutor();
// Trigger the processor
anotherThread.execute(
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
} catch (Exception e) {
}
});
CompletableFuture<Void> future =
testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5")));

Thread.sleep(1000);
assertThat(testOperator.getProcessed()).isEqualTo(1);
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);

// Proceed processing
testOperator.proceed();
anotherThread.shutdown();
Thread.sleep(1000);
future.get();
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
}
}

@Test
void testRecordProcessorWithRecordOrder() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
TestOperator testOperator = (TestOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
ExecutorService anotherThread = Executors.newSingleThreadExecutor();
// Trigger the processor
anotherThread.execute(
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
} catch (Exception e) {
}
});
CompletableFuture<Void> future =
testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5")));

Thread.sleep(1000);
assertThat(testOperator.getProcessed()).isEqualTo(1);
Expand All @@ -147,8 +130,7 @@ void testRecordProcessorWithRecordOrder() throws Exception {

// Proceed processing
testOperator.proceed();
anotherThread.shutdown();
Thread.sleep(1000);
future.get();
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
}
}
Expand All @@ -157,9 +139,9 @@ void testRecordProcessorWithRecordOrder() throws Exception {
void testAsyncProcessWithKey() throws Exception {
TestOperatorWithDirectAsyncProcess testOperator =
new TestOperatorWithDirectAsyncProcess(ElementOrder.RECORD_ORDER);
KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
new AsyncKeyedOneInputStreamOperatorTestHarness<>(
testOperator,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
Expand All @@ -171,15 +153,8 @@ void testAsyncProcessWithKey() throws Exception {
testHarness.open();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
ExecutorService anotherThread = Executors.newSingleThreadExecutor();
// Trigger the processor
anotherThread.execute(
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
} catch (Exception e) {
}
});
CompletableFuture<Void> future =
testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5")));

Thread.sleep(1000);
assertThat(testOperator.getProcessed()).isEqualTo(0);
Expand All @@ -189,8 +164,7 @@ void testAsyncProcessWithKey() throws Exception {

// Proceed processing
testOperator.proceed();
anotherThread.shutdown();
Thread.sleep(1000);
future.get();
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);

// We don't have the mailbox executor actually running, so the new context is blocked
Expand All @@ -203,11 +177,9 @@ void testAsyncProcessWithKey() throws Exception {

@Test
void testCheckpointDrain() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
CheckpointStorageLocationReference locationReference =
CheckpointStorageLocationReference.getDefault();
AsyncExecutionController asyncExecutionController =
((AbstractAsyncStateStreamOperator) testHarness.getOperator())
.getAsyncExecutionController();
Expand All @@ -225,7 +197,7 @@ void testCheckpointDrain() throws Exception {

@Test
void testTimerServiceIsAsync() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
assertThat(testHarness.getOperator())
Expand All @@ -249,22 +221,13 @@ public void onProcessingTime(InternalTimer timer) throws Exception {}

@Test
void testNonRecordProcess() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
TestOperator testOperator = (TestOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
ExecutorService anotherThread = Executors.newSingleThreadExecutor();
anotherThread.execute(
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
testOperator.processLatencyMarker(
new LatencyMarker(1234, new OperatorID(), 0));
} catch (Exception e) {
}
});
testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5")));
CompletableFuture<Void> future =
testHarness.processLatencyMarker(new LatencyMarker(1234, new OperatorID(), 0));

Thread.sleep(1000);
assertThat(testOperator.getProcessed()).isEqualTo(1);
Expand All @@ -274,32 +237,24 @@ void testNonRecordProcess() throws Exception {

// Proceed processing
testOperator.proceed();
anotherThread.shutdown();
Thread.sleep(1000);
future.get();
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
assertThat(testOperator.getLatencyProcessed()).isEqualTo(1);
}
}

@Test
void testWatermarkStatus() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
TestOperator testOperator = (TestOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
ExecutorService anotherThread = Executors.newSingleThreadExecutor();
anotherThread.execute(
() -> {
try {
processor.accept(new StreamRecord<>(Tuple2.of(5, "5")));
testOperator.processWatermark1(new Watermark(205L));
testOperator.processWatermark2(new Watermark(105L));
testOperator.processWatermarkStatus(WatermarkStatus.IDLE, 1);
} catch (Exception e) {
}
});
testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5")));
testHarness.processWatermark(new Watermark(205L));
CompletableFuture<Void> future =
testHarness.processWatermarkStatus(WatermarkStatus.IDLE);

Thread.sleep(1000);
assertThat(testOperator.getProcessed()).isEqualTo(1);
Expand All @@ -310,15 +265,57 @@ void testWatermarkStatus() throws Exception {

// Proceed processing
testOperator.proceed();
anotherThread.shutdown();
Thread.sleep(1000);
future.get();
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
assertThat(testOperator.watermarkStatus.isActive()).isFalse();
assertThat(testHarness.getOutput())
.containsExactly(
new StreamRecord<>("EventTimer-5-105"),
new Watermark(105L),
new Watermark(205L));
new Watermark(205L),
WatermarkStatus.IDLE);
}
}

@Test
void testIdleWatermarkHandling() throws Exception {
final WatermarkTestingOperator testOperator = new WatermarkTestingOperator();

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
KeySelector<Long, Integer> dummyKeySelector = l -> 0;
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Integer, Long, Long, Long> testHarness =
new AsyncKeyedTwoInputStreamOperatorTestHarness<>(
testOperator,
dummyKeySelector,
dummyKeySelector,
BasicTypeInfo.INT_TYPE_INFO,
1,
1,
0)) {
testHarness.setup();
testHarness.open();
testHarness.processElement1(1L, 1L).get();
testHarness.processElement1(3L, 3L).get();
testHarness.processElement1(4L, 4L).get();
testHarness.processWatermark1(new Watermark(1L)).get();
assertThat(testHarness.getOutput()).isEmpty();

testHarness.processWatermarkStatus2(WatermarkStatus.IDLE).get();
expectedOutput.add(new StreamRecord<>(1L));
expectedOutput.add(new Watermark(1L));
TestHarnessUtil.assertOutputEquals(
"Output was not correct", expectedOutput, testHarness.getOutput());

testHarness.processWatermark1(new Watermark(3L)).get();
expectedOutput.add(new StreamRecord<>(3L));
expectedOutput.add(new Watermark(3L));
TestHarnessUtil.assertOutputEquals(
"Output was not correct", expectedOutput, testHarness.getOutput());

testHarness.processWatermarkStatus2(WatermarkStatus.ACTIVE).get();
// the other input is active now, we should not emit the watermark
testHarness.processWatermark1(new Watermark(4L));
TestHarnessUtil.assertOutputEquals(
"Output was not correct", expectedOutput, testHarness.getOutput());
}
}

Expand Down Expand Up @@ -432,6 +429,40 @@ public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws
}
}

private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOperator<Long>
implements TwoInputStreamOperator<Long, Long, Long>,
Triggerable<Integer, VoidNamespace> {

private transient InternalTimerService<VoidNamespace> timerService;

@Override
public void open() throws Exception {
super.open();

this.timerService =
getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
output.collect(new StreamRecord<>(timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer)
throws Exception {}

@Override
public void processElement1(StreamRecord<Long> element) throws Exception {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue());
}

@Override
public void processElement2(StreamRecord<Long> element) throws Exception {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue());
}
}

/** {@link KeySelector} for tests. */
public static class TestKeySelector implements KeySelector<Tuple2<Integer, String>, Integer> {
private static final long serialVersionUID = 1L;
Expand Down
Loading