Skip to content

Commit b2b99fe

Browse files
Poorvankbhatiafapaul
authored andcommitted
[FLINK-32695] Migrated SavepointReaderITTestBase to SourceV2
1 parent 9e977ce commit b2b99fe

File tree

2 files changed

+58
-22
lines changed

2 files changed

+58
-22
lines changed

flink-libraries/flink-state-processing-api/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ under the License.
114114
<scope>test</scope>
115115
</dependency>
116116

117+
<dependency>
118+
<groupId>org.apache.flink</groupId>
119+
<artifactId>flink-test-utils-connector</artifactId>
120+
<version>${project.version}</version>
121+
<scope>test</scope>
122+
</dependency>
123+
117124
</dependencies>
118125

119126
<build>

flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderITTestBase.java

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,20 @@
1919
package org.apache.flink.state.api;
2020

2121
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2223
import org.apache.flink.api.common.functions.OpenContext;
2324
import org.apache.flink.api.common.state.ListState;
2425
import org.apache.flink.api.common.state.ListStateDescriptor;
2526
import org.apache.flink.api.common.state.MapStateDescriptor;
2627
import org.apache.flink.api.common.time.Deadline;
28+
import org.apache.flink.api.connector.source.ReaderOutput;
29+
import org.apache.flink.api.connector.source.SourceReaderContext;
30+
import org.apache.flink.api.connector.source.SplitEnumerator;
31+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2732
import org.apache.flink.api.java.tuple.Tuple2;
2833
import org.apache.flink.client.program.ClusterClient;
2934
import org.apache.flink.core.execution.SavepointFormatType;
35+
import org.apache.flink.core.io.InputStatus;
3036
import org.apache.flink.runtime.jobgraph.JobGraph;
3137
import org.apache.flink.runtime.state.FunctionInitializationContext;
3238
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -37,8 +43,11 @@
3743
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3844
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
3945
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
40-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
4146
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
47+
import org.apache.flink.test.util.source.AbstractTestSource;
48+
import org.apache.flink.test.util.source.SingleSplitEnumerator;
49+
import org.apache.flink.test.util.source.TestSourceReader;
50+
import org.apache.flink.test.util.source.TestSplit;
4251
import org.apache.flink.util.AbstractID;
4352
import org.apache.flink.util.Collector;
4453

@@ -88,7 +97,12 @@ public void testOperatorStateInputFormat() throws Exception {
8897
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
8998
env.setParallelism(4);
9099

91-
DataStream<Integer> data = env.addSource(new SavepointSource()).rebalance();
100+
DataStream<Integer> data =
101+
env.fromSource(
102+
new SavepointSource(),
103+
WatermarkStrategy.noWatermarks(),
104+
"SavepointSource")
105+
.rebalance();
92106

93107
StatefulOperator statefulOperator = new StatefulOperator(list, union, broadcast);
94108
data.connect(data.broadcast(broadcast))
@@ -209,35 +223,50 @@ private String takeSavepoint(JobGraph jobGraph) throws Exception {
209223
}
210224
}
211225

212-
private static class SavepointSource implements SourceFunction<Integer> {
226+
private static class SavepointSource extends AbstractTestSource<Integer> {
213227
private static volatile boolean finished;
214228

215-
private volatile boolean running = true;
216-
217229
private static final Integer[] elements = {1, 2, 3};
218230

219231
@Override
220-
public void run(SourceContext<Integer> ctx) {
221-
synchronized (ctx.getCheckpointLock()) {
222-
for (Integer element : elements) {
223-
ctx.collect(element);
224-
}
232+
public SplitEnumerator<TestSplit, Void> createEnumerator(
233+
SplitEnumeratorContext<TestSplit> enumContext) {
234+
return new SingleSplitEnumerator(enumContext);
235+
}
225236

226-
finished = true;
227-
}
237+
@Override
238+
public TestSourceReader<Integer> createReader(SourceReaderContext readerContext) {
239+
return new TestSourceReader<>(readerContext) {
240+
private boolean receivedSplit = false;
241+
private CompletableFuture<Void> availability = new CompletableFuture<>();
242+
243+
@Override
244+
public void addSplits(List<TestSplit> splits) {
245+
if (!splits.isEmpty()) {
246+
receivedSplit = true;
247+
if (!availability.isDone()) {
248+
availability.complete(null);
249+
}
250+
}
251+
}
228252

229-
while (running) {
230-
try {
231-
Thread.sleep(100);
232-
} catch (InterruptedException e) {
233-
// ignore
253+
@Override
254+
public InputStatus pollNext(ReaderOutput<Integer> output) {
255+
if (receivedSplit) {
256+
for (Integer element : elements) {
257+
output.collect(element);
258+
}
259+
finished = true;
260+
availability = new CompletableFuture<>();
261+
}
262+
return InputStatus.NOTHING_AVAILABLE;
234263
}
235-
}
236-
}
237264

238-
@Override
239-
public void cancel() {
240-
running = false;
265+
@Override
266+
public CompletableFuture<Void> isAvailable() {
267+
return availability;
268+
}
269+
};
241270
}
242271

243272
private static void initializeForTest() {

0 commit comments

Comments
 (0)