diff --git a/flink-test-utils-parent/flink-test-utils-source/pom.xml b/flink-test-utils-parent/flink-test-utils-source/pom.xml
new file mode 100644
index 0000000000000..f5b290db3c2d5
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/pom.xml
@@ -0,0 +1,69 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-test-utils-parent
+ 2.2-SNAPSHOT
+
+
+ flink-test-utils-source
+ Flink : Test utils : Source
+
+ jar
+
+
+
+
+ org.apache.flink
+ flink-core
+ ${project.version}
+
+
+
+
+ junit
+ junit
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/AbstractTestSource.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/AbstractTestSource.java
new file mode 100644
index 0000000000000..a7b20c4075c6d
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/AbstractTestSource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * Abstract base class for test sources that provides default implementations for all Source V2
+ * methods. This class fixes the enumerator checkpoint type to {@code Void} for the common case
+ * where no checkpoint state is needed.
+ *
+ *
For test cases that require checkpointing enumerator state, extend {@link
+ * AbstractTestSourceBase} directly with a custom checkpoint type.
+ *
+ * @param The type of records produced by this source
+ */
+public abstract class AbstractTestSource extends AbstractTestSourceBase {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() {
+ return VoidSerializer.INSTANCE;
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/AbstractTestSourceBase.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/AbstractTestSourceBase.java
new file mode 100644
index 0000000000000..3c4e9f633d832
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/AbstractTestSourceBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * Base class for test sources that allows customization of the enumerator checkpoint type.
+ *
+ * Most test cases should extend {@link AbstractTestSource} which fixes the enumerator state to
+ * {@code Void}. Test cases that need checkpointing can create their own extensions of this base
+ * class.
+ *
+ * @param The type of records produced by this source
+ * @param The type of the enumerator checkpoint state
+ */
+@Internal
+public abstract class AbstractTestSourceBase
+ implements Source {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ /**
+ * Creates a source reader. The default implementation returns an empty reader that immediately
+ * returns END_OF_INPUT. Subclasses can override this method to provide their specific reader
+ * implementation.
+ */
+ @Override
+ public SourceReader createReader(SourceReaderContext readerContext) {
+ return new TestSourceReader(readerContext) {
+ @Override
+ public InputStatus pollNext(ReaderOutput output) {
+ return InputStatus.END_OF_INPUT;
+ }
+ };
+ }
+
+ @Override
+ public SplitEnumerator createEnumerator(
+ SplitEnumeratorContext enumContext) {
+ return createEnumerator(enumContext, null);
+ }
+
+ /**
+ * Creates a split enumerator. Subclasses can override this to provide custom enumerator
+ * behavior. The default implementation returns a {@link TestSplitEnumerator} that ignores the
+ * checkpoint state.
+ *
+ * @param enumContext The enumerator context
+ * @param checkpoint The checkpoint state to restore from, or null for a fresh start
+ * @return The split enumerator
+ */
+ @SuppressWarnings("unchecked")
+ protected SplitEnumerator createEnumerator(
+ SplitEnumeratorContext enumContext, EnumChkptState checkpoint) {
+ // Default implementation ignores checkpoint and returns the basic test enumerator
+ // This cast is safe because TestSplitEnumerator implements SplitEnumerator
+ // and subclasses that need different checkpoint types will override this method
+ return (SplitEnumerator)
+ new TestSplitEnumerator(enumContext, null);
+ }
+
+ @Override
+ public SplitEnumerator restoreEnumerator(
+ SplitEnumeratorContext enumContext, EnumChkptState checkpoint) {
+ return createEnumerator(enumContext, checkpoint);
+ }
+
+ @Override
+ public SimpleVersionedSerializer getSplitSerializer() {
+ return TestSplit.SERIALIZER;
+ }
+
+ /**
+ * Returns the serializer for enumerator checkpoints. Subclasses must implement this to provide
+ * the appropriate serializer for their checkpoint type.
+ */
+ @Override
+ public abstract SimpleVersionedSerializer getEnumeratorCheckpointSerializer();
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/SharedSplitEnumerator.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/SharedSplitEnumerator.java
new file mode 100644
index 0000000000000..dfd9332953de9
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/SharedSplitEnumerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+/**
+ * A split enumerator where each reader gets the same split.
+ *
+ * Useful for tests that need all readers to process the same data.
+ */
+@Internal
+public class SharedSplitEnumerator extends TestSplitEnumerator {
+
+ public SharedSplitEnumerator(SplitEnumeratorContext context) {
+ super(context, null);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ context.assignSplit(TestSplit.INSTANCE, subtaskId);
+ context.signalNoMoreSplits(subtaskId);
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/SingleSplitEnumerator.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/SingleSplitEnumerator.java
new file mode 100644
index 0000000000000..58549e6fcc396
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/SingleSplitEnumerator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+/**
+ * A split enumerator where the first reader gets one split, others get nothing.
+ *
+ * Useful for tests that need minimal data processing on a single subtask.
+ */
+@Internal
+public class SingleSplitEnumerator extends TestSplitEnumerator {
+
+ private boolean assigned = false;
+
+ public SingleSplitEnumerator(SplitEnumeratorContext context) {
+ super(context, null);
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ if (!assigned) {
+ context.assignSplit(TestSplit.INSTANCE, subtaskId);
+ assigned = true;
+ }
+ context.signalNoMoreSplits(subtaskId);
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestReaderOutput.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestReaderOutput.java
new file mode 100644
index 0000000000000..e27652af587ba
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestReaderOutput.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test utility for capturing output from SourceReader implementations. Provides convenient methods
+ * to verify emitted records without implementing the full ReaderOutput interface.
+ *
+ * @param The type of records emitted by the source
+ */
+@Internal
+public class TestReaderOutput implements ReaderOutput {
+
+ private final List collected = new ArrayList<>();
+
+ @Override
+ public void collect(T record) {
+ collected.add(record);
+ }
+
+ @Override
+ public void collect(T record, long timestamp) {
+ collected.add(record);
+ }
+
+ @Override
+ public void releaseOutputForSplit(String splitId) {
+ // No-op for testing
+ }
+
+ @Override
+ public ReaderOutput createOutputForSplit(String splitId) {
+ return this;
+ }
+
+ @Override
+ public void markIdle() {
+ // No-op for testing
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ // No-op for testing
+ }
+
+ @Override
+ public void markActive() {
+ // No-op for testing
+ }
+
+ /**
+ * Returns all records collected by this output.
+ *
+ * @return List of all collected records
+ */
+ public List getCollected() {
+ return new ArrayList<>(collected);
+ }
+
+ /**
+ * Returns the last emitted record, or null if no records have been emitted.
+ *
+ * @return The most recently emitted record
+ */
+ public T getLastEmitted() {
+ return collected.isEmpty() ? null : collected.get(collected.size() - 1);
+ }
+
+ /**
+ * Returns the number of records collected.
+ *
+ * @return Count of collected records
+ */
+ public int getCollectedCount() {
+ return collected.size();
+ }
+
+ /** Clears all collected records. */
+ public void clear() {
+ collected.clear();
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSourceReader.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSourceReader.java
new file mode 100644
index 0000000000000..64736a487561e
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSourceReader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for test source readers that provides default implementations for all SourceReader
+ * methods. Tests can extend this class and override only the methods they need to customize,
+ * typically just {@link #pollNext(ReaderOutput)}.
+ *
+ * By default, this reader:
+ *
+ *
+ * - Returns {@link InputStatus#END_OF_INPUT} from {@link #pollNext(ReaderOutput)}
+ *
- Returns empty list from {@link #snapshotState(long)}
+ *
- Provides a completed future from {@link #isAvailable()}
+ *
- No-ops for all other methods
+ *
+ *
+ * @param The type of records produced by this reader
+ */
+public class TestSourceReader implements SourceReader {
+
+ protected final SourceReaderContext context;
+
+ public TestSourceReader(SourceReaderContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void start() {
+ // No-op by default
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output) throws Exception {
+ // By default, immediately signal end of input
+ // Tests should override this method to produce actual records
+ return InputStatus.END_OF_INPUT;
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CompletableFuture isAvailable() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void addSplits(List splits) {
+ // No-op by default - most test sources work with a single split
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ // No-op by default
+ }
+
+ @Override
+ public void close() throws Exception {
+ // No-op by default
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSplit.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSplit.java
new file mode 100644
index 0000000000000..5cd3e0df1dded
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSplit.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * A simple test split implementation that can be reused across test sources. This provides a
+ * minimal split that is sufficient for most testing scenarios.
+ */
+public class TestSplit implements SourceSplit {
+
+ /** Singleton instance for reuse across all test sources. */
+ public static final TestSplit INSTANCE = new TestSplit();
+
+ /** Serializer for TestSplit instances. */
+ public static final SimpleVersionedSerializer SERIALIZER = new TestSplitSerializer();
+
+ private static final String SPLIT_ID = "test-split";
+
+ private TestSplit() {
+ // Private constructor for singleton pattern
+ }
+
+ @Override
+ public String splitId() {
+ return SPLIT_ID;
+ }
+
+ @Override
+ public String toString() {
+ return "TestSplit{id=" + SPLIT_ID + "}";
+ }
+
+ /** Serializer for TestSplit that always returns the singleton instance. */
+ private static class TestSplitSerializer implements SimpleVersionedSerializer {
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(TestSplit split) {
+ return new byte[0];
+ }
+
+ @Override
+ public TestSplit deserialize(int version, byte[] serialized) {
+ return INSTANCE;
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSplitEnumerator.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSplitEnumerator.java
new file mode 100644
index 0000000000000..c6f8a5b70f5f2
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/TestSplitEnumerator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+
+import java.util.List;
+
+/**
+ * Base split enumerator for test sources that provides sensible defaults for most methods.
+ *
+ * By default, this enumerator immediately signals no more splits to ensure bounded completion.
+ * Test sources can extend this class and only override the methods they need, typically just {@link
+ * #snapshotState(long)} for checkpointing behavior or {@link #addReader(int)} for custom split
+ * assignment.
+ *
+ *
For common patterns, use the concrete implementations:
+ *
+ *
+ * - {@link SingleSplitEnumerator} - First reader gets one split, others get nothing
+ *
- {@link SharedSplitEnumerator} - All readers get the same split
+ *
+ *
+ * @param The type of the enumerator checkpoint state
+ */
+@Internal
+public class TestSplitEnumerator
+ implements SplitEnumerator {
+
+ protected final SplitEnumeratorContext context;
+ protected final EnumChkptState checkpointState;
+
+ public TestSplitEnumerator(
+ SplitEnumeratorContext context, EnumChkptState checkpointState) {
+ this.context = context;
+ this.checkpointState = checkpointState;
+ }
+
+ @Override
+ public void start() {
+ // No-op implementation
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, String requesterHostname) {
+ // No-op implementation
+ }
+
+ @Override
+ public void addSplitsBack(List splits, int subtaskId) {
+ // No-op implementation
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // Signal no more splits to ensure immediate completion
+ context.signalNoMoreSplits(subtaskId);
+ }
+
+ @Override
+ public void close() {
+ // No-op implementation
+ }
+
+ /**
+ * Subclasses should override this method to provide their checkpoint state. The default
+ * implementation returns the checkpoint state passed in the constructor.
+ */
+ @Override
+ public EnumChkptState snapshotState(long checkpointId) {
+ return checkpointState;
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/VoidSerializer.java b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/VoidSerializer.java
new file mode 100644
index 0000000000000..c769b7f1f05f1
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/main/java/org/apache/flink/test/util/source/VoidSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serializer for Void checkpoint states. This is useful for test sources that don't need to store
+ * any checkpoint state. The serializer always returns null during deserialization and stores no
+ * data during serialization.
+ */
+@Internal
+public enum VoidSerializer implements SimpleVersionedSerializer {
+ INSTANCE;
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(Void obj) throws IOException {
+ // Always return empty byte array for Void objects
+ return new byte[0];
+ }
+
+ @Override
+ public Void deserialize(int version, byte[] serialized) throws IOException {
+ // Always return null - Void objects have no state to restore
+ return null;
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-source/src/test/java/org/apache/flink/test/util/source/SourcePatternExamplesTest.java b/flink-test-utils-parent/flink-test-utils-source/src/test/java/org/apache/flink/test/util/source/SourcePatternExamplesTest.java
new file mode 100644
index 0000000000000..35b5d8c72cafb
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-source/src/test/java/org/apache/flink/test/util/source/SourcePatternExamplesTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests showing how to create sources using AbstractTestSource utilities for common patterns. */
+public class SourcePatternExamplesTest {
+
+ /*
+ * Testing: Source that must call external service on each data emission
+ * Why: Common pattern where legacy sources coordinate with external systems during processing
+ * Pattern: Business logic goes in pollNext(), everything else uses defaults from AbstractTestSource
+ */
+ @Test
+ public void testSourceThatInteractsWithProcessingTimeService() throws Exception {
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ List processingTimes = Arrays.asList(1000L, 2000L);
+ AbstractTestSource source =
+ new AbstractTestSource() {
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext readerContext) {
+ return new TestSourceReader(readerContext) {
+ private int currentIndex = 0;
+ private volatile boolean cancelled = false;
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output) {
+ if (cancelled || currentIndex >= processingTimes.size()) {
+ return InputStatus.END_OF_INPUT;
+ }
+ Long processingTime = processingTimes.get(currentIndex++);
+ processingTimeService.setCurrentTime(processingTime);
+ output.collect(processingTime);
+ return currentIndex >= processingTimes.size()
+ ? InputStatus.END_OF_INPUT
+ : InputStatus.MORE_AVAILABLE;
+ }
+
+ @Override
+ public void close() throws Exception {
+ cancelled = true;
+ super.close();
+ }
+ };
+ }
+ };
+
+ // Test the coordination pattern: verify each poll calls external service + emits data
+ SourceReader reader = source.createReader(null);
+ TestReaderOutput output = new TestReaderOutput<>();
+
+ // First poll: time advances to 1000L and emits it
+ reader.pollNext(output);
+ assertEquals(1000L, (long) output.getLastEmitted());
+ assertEquals(1000L, processingTimeService.getCurrentTime());
+
+ // Second poll: time advances to 2000L and emits it
+ reader.pollNext(output);
+ assertEquals(2000L, (long) output.getLastEmitted());
+ assertEquals(2000L, processingTimeService.getCurrentTime());
+
+ // Third poll: no more data, should signal end
+ InputStatus status = reader.pollNext(output);
+ assertEquals(InputStatus.END_OF_INPUT, status);
+ }
+
+ /*
+ * Testing: Source that generates infinite data stream without meaningful checkpointing
+ * Why: Common pattern for continuous data generation that doesn't actually checkpoint state
+ * Pattern: Use AbstractTestSource for unbounded sources that don't need real checkpointing
+ */
+ @Test
+ public void testInfiniteStringGeneratorWithoutCheckpointing() throws Exception {
+ AbstractTestSource source =
+ new AbstractTestSource<>() {
+
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext readerContext) {
+ return new TestSourceReader<>(readerContext) {
+ private volatile boolean running = true;
+ private int emissionCount = 0;
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output)
+ throws Exception {
+ if (!running) {
+ return InputStatus.END_OF_INPUT;
+ }
+ output.collect("someString");
+ emissionCount++;
+ // Stop after 3 emissions to avoid infinite test
+ if (emissionCount >= 3) {
+ running = false;
+ return InputStatus.END_OF_INPUT;
+ }
+
+ return InputStatus.MORE_AVAILABLE;
+ }
+
+ @Override
+ public void close() throws Exception {
+ running = false;
+ super.close();
+ }
+ };
+ }
+ };
+
+ // Test data generation pattern
+ SourceReader reader = source.createReader(null);
+ TestReaderOutput output = new TestReaderOutput<>();
+
+ // First poll: generates "someString"
+ reader.pollNext(output);
+ assertEquals("someString", output.getLastEmitted());
+
+ // Second poll: generates another "someString"
+ reader.pollNext(output);
+ assertEquals("someString", output.getLastEmitted());
+ assertEquals(2, output.getCollectedCount());
+
+ // Third poll: generates final "someString" and signals end
+ InputStatus status = reader.pollNext(output);
+ assertEquals(InputStatus.END_OF_INPUT, status);
+ assertEquals(3, output.getCollectedCount());
+ }
+
+ /*
+ * Testing: Source that blocks using Thread.sleep() until external signal
+ * Why: Some sources legitimately need blocking behavior in pollNext()
+ * Pattern: Keep Thread.sleep() in pollNext() when blocking is actually required
+ */
+ @Test
+ public void testSourceWithLegitimateBlocking() throws Exception {
+ final boolean[] shouldCloseSource = {false};
+
+ AbstractTestSource source =
+ new AbstractTestSource<>() {
+
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext readerContext) {
+ return new TestSourceReader(readerContext) {
+ private int pollCount = 0;
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output)
+ throws Exception {
+ pollCount++;
+
+ if (shouldCloseSource[0]) {
+ return InputStatus.END_OF_INPUT;
+ }
+ Thread.sleep(100);
+ if (pollCount >= 2) {
+ shouldCloseSource[0] = true;
+ return InputStatus.END_OF_INPUT;
+ }
+
+ return InputStatus.NOTHING_AVAILABLE;
+ }
+
+ @Override
+ public void close() throws Exception {
+ shouldCloseSource[0] = true;
+ super.close();
+ }
+ };
+ }
+ };
+
+ // Test blocking behavior (but limit to avoid infinite test)
+ SourceReader reader = source.createReader(null);
+ TestReaderOutput output = new TestReaderOutput<>();
+ reader.pollNext(output);
+ assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(output));
+ assertEquals(0, output.getCollectedCount());
+ }
+
+ /*
+ * Testing: Source that emits timestamped data and watermarks
+ * Why: Common pattern for event-time processing with explicit timestamp assignment
+ * Pattern: Use collect(record, timestamp) and emitWatermark() in pollNext()
+ */
+ @Test
+ public void testSourceWithTimestampsAndWatermarks() throws Exception {
+ AbstractTestSource source =
+ new AbstractTestSource() {
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext readerContext) {
+ return new TestSourceReader(readerContext) {
+ private int step = 0;
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output)
+ throws Exception {
+ switch (step++) {
+ case 0:
+ // Emit record with timestamp
+ output.collect(1, 0);
+ return InputStatus.MORE_AVAILABLE;
+ case 1:
+ // Emit watermark
+ output.emitWatermark(new Watermark(0));
+ return InputStatus.MORE_AVAILABLE;
+ case 2:
+ output.collect(2, 1);
+ return InputStatus.END_OF_INPUT;
+ default:
+ return InputStatus.END_OF_INPUT;
+ }
+ }
+ };
+ }
+ };
+
+ // Verify source handles timestamped data and watermarks
+ assertEquals(Boundedness.BOUNDED, source.getBoundedness());
+
+ // Test timestamp and watermark emission pattern
+ SourceReader reader = source.createReader(null);
+ TestReaderOutput output = new TestReaderOutput<>();
+
+ // Step through the sequence of emissions
+ assertEquals(InputStatus.MORE_AVAILABLE, reader.pollNext(output)); // collect(1, 0)
+ assertEquals(1, output.getCollectedCount());
+ assertEquals(Integer.valueOf(1), output.getLastEmitted());
+
+ assertEquals(InputStatus.MORE_AVAILABLE, reader.pollNext(output)); // emitWatermark(0)
+ assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(output)); // collect(2, 1)
+ assertEquals(2, output.getCollectedCount());
+ assertEquals(Integer.valueOf(2), output.getLastEmitted());
+ }
+
+ /*
+ * Testing: Source with custom enumerator checkpointing that AbstractTestSource cannot handle
+ * Why: Demonstrates how to use AbstractTestSourceBase when you need real checkpoint state
+ * Pattern: Custom checkpoint type (Integer) with stateful enumerator that tracks emission count
+ */
+ @Test
+ public void testSourceWithCustomEnumeratorCheckpointing() throws Exception {
+ AbstractTestSourceBase source =
+ new AbstractTestSourceBase<>() {
+ @Override
+ public SourceReader createReader(
+ SourceReaderContext readerContext) {
+ return new TestSourceReader<>(readerContext) {
+ private int emissionCount = 0;
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output) {
+ if (emissionCount >= 3) {
+ return InputStatus.END_OF_INPUT;
+ }
+ output.collect("data-" + emissionCount);
+ emissionCount++;
+ return emissionCount >= 3
+ ? InputStatus.END_OF_INPUT
+ : InputStatus.MORE_AVAILABLE;
+ }
+ };
+ }
+
+ @Override
+ protected SplitEnumerator createEnumerator(
+ SplitEnumeratorContext enumContext, Integer checkpoint) {
+ return new TestSplitEnumerator(enumContext, checkpoint) {
+ @Override
+ public Integer snapshotState(long checkpointId) {
+ return checkpointState != null ? checkpointState + 1 : 1;
+ }
+ };
+ }
+
+ @Override
+ public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() {
+ return new SimpleVersionedSerializer() {
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(Integer obj) throws IOException {
+ return obj != null ? obj.toString().getBytes() : new byte[0];
+ }
+
+ @Override
+ public Integer deserialize(int version, byte[] serialized) {
+ return serialized.length > 0
+ ? Integer.parseInt(new String(serialized))
+ : 0;
+ }
+ };
+ }
+ };
+
+ // Test source reader functionality
+ SourceReader reader = source.createReader(null);
+ TestReaderOutput output = new TestReaderOutput<>();
+
+ assertEquals(InputStatus.MORE_AVAILABLE, reader.pollNext(output));
+ assertEquals("data-0", output.getLastEmitted());
+
+ assertEquals(InputStatus.MORE_AVAILABLE, reader.pollNext(output));
+ assertEquals("data-1", output.getLastEmitted());
+
+ assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(output));
+ assertEquals("data-2", output.getLastEmitted());
+ assertEquals(3, output.getCollectedCount());
+ }
+
+ /*
+ * Testing: No-op source that requires no reader implementation
+ * Why: Some tests just need a source for framework testing without data emission
+ * Pattern: Use AbstractTestSourceBase/AbstractTestSource directly without overriding createReader
+ */
+ @Test
+ public void testNoopSourceWithDefaultReader() throws Exception {
+ // Create a no-op source with Void checkpointing (most common case)
+ AbstractTestSource noopSource = new AbstractTestSource<>() {};
+
+ // Test that it works without any reader implementation
+ SourceReader reader = noopSource.createReader(null);
+ TestReaderOutput output = new TestReaderOutput<>();
+
+ // Should immediately return END_OF_INPUT without emitting anything
+ assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(output));
+ assertEquals(0, output.getCollectedCount());
+ assertEquals(null, output.getLastEmitted());
+
+ // Verify source properties
+ assertEquals(Boundedness.BOUNDED, noopSource.getBoundedness());
+ }
+
+ /*
+ * Testing: No-op source with custom checkpoint state but no data emission
+ * Why: Testing enumerator checkpointing behavior without reader complexity
+ * Pattern: Use AbstractTestSourceBase with custom state, rely on default empty reader
+ */
+ @Test
+ public void testNoopSourceWithCustomCheckpointing() throws Exception {
+ AbstractTestSourceBase noopSourceWithCheckpoint =
+ new AbstractTestSourceBase() {
+ @Override
+ public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() {
+ return new SimpleVersionedSerializer() {
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(String obj) {
+ return obj != null ? obj.getBytes() : new byte[0];
+ }
+
+ @Override
+ public String deserialize(int version, byte[] serialized) {
+ return serialized.length > 0 ? new String(serialized) : "";
+ }
+ };
+ }
+ };
+
+ // Test that reader works without implementation
+ SourceReader reader = noopSourceWithCheckpoint.createReader(null);
+ TestReaderOutput output = new TestReaderOutput<>();
+
+ assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(output));
+ assertEquals(0, output.getCollectedCount());
+
+ // Test enumerator checkpoint serialization
+ SimpleVersionedSerializer serializer =
+ noopSourceWithCheckpoint.getEnumeratorCheckpointSerializer();
+ byte[] data = serializer.serialize("test-checkpoint");
+ String restored = serializer.deserialize(serializer.getVersion(), data);
+ assertEquals("test-checkpoint", restored);
+ }
+
+ // Helper classes for testing
+ private static class TestProcessingTimeService {
+ private long currentTime = 0;
+
+ public void setCurrentTime(long time) {
+ this.currentTime = time;
+ }
+
+ public long getCurrentTime() {
+ return currentTime;
+ }
+ }
+}
diff --git a/flink-test-utils-parent/pom.xml b/flink-test-utils-parent/pom.xml
index 5ae811888a7fd..f71f9cb19a7f4 100644
--- a/flink-test-utils-parent/pom.xml
+++ b/flink-test-utils-parent/pom.xml
@@ -37,6 +37,7 @@ under the License.
flink-test-utils-junit
flink-test-utils
+ flink-test-utils-source
flink-connector-test-utils
flink-clients-test-utils
flink-migration-test-utils