diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/pom.xml
new file mode 100644
index 00000000000..1028cec4d4f
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/pom.xml
@@ -0,0 +1,126 @@
+
+
+
+ 4.0.0
+
+ org.apache.inlong
+ sort-connectors-v1.18
+ 2.1.0-SNAPSHOT
+
+
+ sort-connector-kafka-v1.18
+ jar
+ Apache InLong - Sort-connector-kafka
+
+
+ ${project.parent.parent.parent.parent.parent.basedir}
+ 3.2.0-1.18
+ 3.4.0
+
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+ org.apache.inlong
+ sort-common
+ ${project.version}
+
+
+ org.apache.inlong
+ sort-connector-base
+ ${project.version}
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-kafka
+ ${flink.connector.kafka.version}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+
+ shade
+
+ package
+
+ true
+
+
+ org.apache.inlong:*
+ io.streamnative.connectors:kafka-flink-connector-origin*
+ io.streamnative.connectors:flink-protobuf
+ org.apache.kafka:*
+ org.apache.flink:flink-connector-kafka
+ com.google.protobuf:*
+ org.bouncycastle*:*
+ org.bouncycastle*:*
+ javax.*:*
+ org.lz4*:*
+ org.slf4j:jul-to-slf4j
+ io.airlift:*
+
+
+
+
+ org.apache.inlong:sort-connector-*
+
+ org/apache/inlong/**
+ META-INF/services/org.apache.flink.table.factories.Factory
+
+
+
+ *:*
+
+ log4j.properties
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
new file mode 100644
index 00000000000..6833e557f26
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.source;
+
+import org.apache.inlong.sort.kafka.source.reader.KafkaSourceReader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
+import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
+import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
+import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
+import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder} to construct a {@link
+ * KafkaSource}. The following example shows how to create a KafkaSource emitting records of
+ * String
type.
+ *
+ * {@code
+ * KafkaSource source = KafkaSource
+ * .builder()
+ * .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ * .setGroupId("MyGroup")
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ * .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+ * .setStartingOffsets(OffsetsInitializer.earliest())
+ * .build();
+ * }
+ *
+ * {@link org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator} only supports
+ * adding new splits and not removing splits in split discovery.
+ *
+ *
See {@link KafkaSourceBuilder} for more details on how to configure this source.
+ *
+ * @param the output type of the source.
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
+ */
+// TODO: Add a variable metricSchema to report audit information
+public class KafkaSource
+ implements
+ Source,
+ ResultTypeQueryable {
+
+ private static final long serialVersionUID = -8755372893283732098L;
+ // Users can choose only one of the following ways to specify the topics to consume from.
+ private final KafkaSubscriber subscriber;
+ // Users can specify the starting / stopping offset initializer.
+ private final OffsetsInitializer startingOffsetsInitializer;
+ private final OffsetsInitializer stoppingOffsetsInitializer;
+ // Boundedness
+ private final Boundedness boundedness;
+ private final KafkaRecordDeserializationSchema deserializationSchema;
+ // The configurations.
+ private final Properties props;
+ // Client rackId callback
+ private final SerializableSupplier rackIdSupplier;
+
+ KafkaSource(
+ KafkaSubscriber subscriber,
+ OffsetsInitializer startingOffsetsInitializer,
+ @Nullable OffsetsInitializer stoppingOffsetsInitializer,
+ Boundedness boundedness,
+ KafkaRecordDeserializationSchema deserializationSchema,
+ Properties props,
+ SerializableSupplier rackIdSupplier) {
+ this.subscriber = subscriber;
+ this.startingOffsetsInitializer = startingOffsetsInitializer;
+ this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+ this.boundedness = boundedness;
+ this.deserializationSchema = deserializationSchema;
+ this.props = props;
+ this.rackIdSupplier = rackIdSupplier;
+ }
+
+ /**
+ * Get a kafkaSourceBuilder to build a {@link KafkaSource}.
+ *
+ * @return a Kafka source builder.
+ */
+ public static KafkaSourceBuilder builder() {
+ return new KafkaSourceBuilder<>();
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return this.boundedness;
+ }
+
+ @Override
+ public SourceReader createReader(SourceReaderContext readerContext)
+ throws Exception {
+ return createReader(readerContext, (ignore) -> {
+ });
+ }
+
+ @VisibleForTesting
+ SourceReader createReader(
+ SourceReaderContext readerContext, Consumer> splitFinishedHook)
+ throws Exception {
+ FutureCompletingBlockingQueue>> elementsQueue =
+ new FutureCompletingBlockingQueue<>();
+ deserializationSchema.open(
+ new DeserializationSchema.InitializationContext() {
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return readerContext.metricGroup().addGroup("deserializer");
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return readerContext.getUserCodeClassLoader();
+ }
+ });
+ final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
+ new KafkaSourceReaderMetrics(readerContext.metricGroup());
+
+ Supplier splitReaderSupplier =
+ () -> new KafkaPartitionSplitReader(
+ props,
+ readerContext,
+ kafkaSourceReaderMetrics,
+ Optional.ofNullable(rackIdSupplier)
+ .map(Supplier::get)
+ .orElse(null));
+ KafkaRecordEmitter recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
+
+ return new KafkaSourceReader<>(
+ elementsQueue,
+ new KafkaSourceFetcherManager(
+ elementsQueue, splitReaderSupplier::get, splitFinishedHook),
+ recordEmitter,
+ toConfiguration(props),
+ readerContext,
+ kafkaSourceReaderMetrics);
+ }
+
+ @Override
+ public SplitEnumerator createEnumerator(
+ SplitEnumeratorContext enumContext) {
+ return new KafkaSourceEnumerator(
+ subscriber,
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer,
+ props,
+ enumContext,
+ boundedness);
+ }
+
+ @Override
+ public SplitEnumerator restoreEnumerator(
+ SplitEnumeratorContext enumContext,
+ KafkaSourceEnumState checkpoint)
+ throws IOException {
+ return new KafkaSourceEnumerator(
+ subscriber,
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer,
+ props,
+ enumContext,
+ boundedness,
+ checkpoint);
+ }
+
+ @Override
+ public SimpleVersionedSerializer getSplitSerializer() {
+ return new KafkaPartitionSplitSerializer();
+ }
+
+ @Override
+ public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() {
+ return new KafkaSourceEnumStateSerializer();
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+ // ----------- private helper methods ---------------
+
+ private Configuration toConfiguration(Properties props) {
+ Configuration config = new Configuration();
+ props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty(key)));
+ return config;
+ }
+
+ @VisibleForTesting
+ Configuration getConfiguration() {
+ return toConfiguration(props);
+ }
+
+ @VisibleForTesting
+ KafkaSubscriber getKafkaSubscriber() {
+ return subscriber;
+ }
+
+ @VisibleForTesting
+ OffsetsInitializer getStoppingOffsetsInitializer() {
+ return stoppingOffsetsInitializer;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
new file mode 100644
index 00000000000..4cfa147e977
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.source;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
+import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The builder class for {@link KafkaSource} to make it easier for the users to construct a {@link
+ * KafkaSource}.
+ *
+ * The following example shows the minimum setup to create a KafkaSource that reads the String
+ * values from a Kafka topic.
+ *
+ *
{@code
+ * KafkaSource source = KafkaSource
+ * .builder()
+ * .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ * .build();
+ * }
+ *
+ * The bootstrap servers, topics/partitions to consume, and the record deserializer are required
+ * fields that must be set.
+ *
+ *
To specify the starting offsets of the KafkaSource, one can call {@link
+ * #setStartingOffsets(OffsetsInitializer)}.
+ *
+ *
By default the KafkaSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stops until the Flink job is canceled or fails. To let the KafkaSource run as {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} yet stop at some given offsets, one can call {@link
+ * #setUnbounded(OffsetsInitializer)}. For example the following KafkaSource stops after it consumes
+ * up to the latest partition offsets at the point when the Flink job started.
+ *
+ *
{@code
+ * KafkaSource source = KafkaSource
+ * .builder()
+ * .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ * .setUnbounded(OffsetsInitializer.latest())
+ * .setRackId(() -> MY_RACK_ID)
+ * .build();
+ * }
+ *
+ * Check the Java docs of each individual methods to learn more about the settings to build a
+ * KafkaSource.
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
+ */
+// TODO: Add a variable metricSchema to report audit information
+public class KafkaSourceBuilder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBuilder.class);
+ private static final String[] REQUIRED_CONFIGS = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG};
+ // The subscriber specifies the partitions to subscribe to.
+ private KafkaSubscriber subscriber;
+ // Users can specify the starting / stopping offset initializer.
+ private OffsetsInitializer startingOffsetsInitializer;
+ private OffsetsInitializer stoppingOffsetsInitializer;
+ // Boundedness
+ private Boundedness boundedness;
+ private KafkaRecordDeserializationSchema deserializationSchema;
+ // The configurations.
+ protected Properties props;
+ // Client rackId supplier
+ private SerializableSupplier rackIdSupplier;
+
+ KafkaSourceBuilder() {
+ this.subscriber = null;
+ this.startingOffsetsInitializer = OffsetsInitializer.earliest();
+ this.stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer();
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ this.deserializationSchema = null;
+ this.props = new Properties();
+ this.rackIdSupplier = null;
+ }
+
+ /**
+ * Sets the bootstrap servers for the KafkaConsumer of the KafkaSource.
+ *
+ * @param bootstrapServers the bootstrap servers of the Kafka cluster.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder setBootstrapServers(String bootstrapServers) {
+ return setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ }
+
+ /**
+ * Sets the consumer group id of the KafkaSource.
+ *
+ * @param groupId the group id of the KafkaSource.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder setGroupId(String groupId) {
+ return setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ }
+
+ /**
+ * Set a list of topics the KafkaSource should consume from. All the topics in the list should
+ * have existed in the Kafka cluster. Otherwise an exception will be thrown. To allow some of
+ * the topics to be created lazily, please use {@link #setTopicPattern(Pattern)} instead.
+ *
+ * @param topics the list of topics to consume from.
+ * @return this KafkaSourceBuilder.
+ * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+ */
+ public KafkaSourceBuilder setTopics(List topics) {
+ ensureSubscriberIsNull("topics");
+ subscriber = KafkaSubscriber.getTopicListSubscriber(topics);
+ return this;
+ }
+
+ /**
+ * Set a list of topics the KafkaSource should consume from. All the topics in the list should
+ * have existed in the Kafka cluster. Otherwise an exception will be thrown. To allow some of
+ * the topics to be created lazily, please use {@link #setTopicPattern(Pattern)} instead.
+ *
+ * @param topics the list of topics to consume from.
+ * @return this KafkaSourceBuilder.
+ * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+ */
+ public KafkaSourceBuilder setTopics(String... topics) {
+ return setTopics(Arrays.asList(topics));
+ }
+
+ /**
+ * Set a topic pattern to consume from use the java {@link Pattern}.
+ *
+ * @param topicPattern the pattern of the topic name to consume from.
+ * @return this KafkaSourceBuilder.
+ * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern)
+ */
+ public KafkaSourceBuilder setTopicPattern(Pattern topicPattern) {
+ ensureSubscriberIsNull("topic pattern");
+ subscriber = KafkaSubscriber.getTopicPatternSubscriber(topicPattern);
+ return this;
+ }
+
+ /**
+ * Set a set of partitions to consume from.
+ *
+ * @param partitions the set of partitions to consume from.
+ * @return this KafkaSourceBuilder.
+ * @see org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)
+ */
+ public KafkaSourceBuilder setPartitions(Set partitions) {
+ ensureSubscriberIsNull("partitions");
+ subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions);
+ return this;
+ }
+
+ /**
+ * Set a custom Kafka subscriber to use to discover new splits.
+ *
+ * @param kafkaSubscriber the {@link KafkaSubscriber} to use for split discovery.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder setKafkaSubscriber(KafkaSubscriber kafkaSubscriber) {
+ ensureSubscriberIsNull("custom");
+ this.subscriber = checkNotNull(kafkaSubscriber);
+ return this;
+ }
+
+ /**
+ * Specify from which offsets the KafkaSource should start consuming from by providing an {@link
+ * OffsetsInitializer}.
+ *
+ * The following {@link OffsetsInitializer}s are commonly used and provided out of the box.
+ * Users can also implement their own {@link OffsetsInitializer} for custom behaviors.
+ *
+ *
+ * - {@link OffsetsInitializer#earliest()} - starting from the earliest offsets. This is
+ * also the default {@link OffsetsInitializer} of the KafkaSource for starting offsets.
+ *
- {@link OffsetsInitializer#latest()} - starting from the latest offsets.
+ *
- {@link OffsetsInitializer#committedOffsets()} - starting from the committed offsets of
+ * the consumer group.
+ *
- {@link
+ * OffsetsInitializer#committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy)}
+ * - starting from the committed offsets of the consumer group. If there is no committed
+ * offsets, starting from the offsets specified by the {@link
+ * org.apache.kafka.clients.consumer.OffsetResetStrategy OffsetResetStrategy}.
+ *
- {@link OffsetsInitializer#offsets(Map)} - starting from the specified offsets for each
+ * partition.
+ *
- {@link OffsetsInitializer#timestamp(long)} - starting from the specified timestamp for
+ * each partition. Note that the guarantee here is that all the records in Kafka whose
+ * {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater than
+ * the given starting timestamp will be consumed. However, it is possible that some
+ * consumer records whose timestamp is smaller than the given starting timestamp are also
+ * consumed.
+ *
+ *
+ * @param startingOffsetsInitializer the {@link OffsetsInitializer} setting the starting offsets
+ * for the Source.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder setStartingOffsets(
+ OffsetsInitializer startingOffsetsInitializer) {
+ this.startingOffsetsInitializer = startingOffsetsInitializer;
+ return this;
+ }
+
+ /**
+ * By default the KafkaSource is set to run as {@link Boundedness#CONTINUOUS_UNBOUNDED} and thus
+ * never stops until the Flink job fails or is canceled. To let the KafkaSource run as a
+ * streaming source but still stop at some point, one can set an {@link OffsetsInitializer} to
+ * specify the stopping offsets for each partition. When all the partitions have reached their
+ * stopping offsets, the KafkaSource will then exit.
+ *
+ * This method is different from {@link #setBounded(OffsetsInitializer)} in that after
+ * setting the stopping offsets with this method, {@link KafkaSource#getBoundedness()} will
+ * still return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at the
+ * stopping offsets specified by the stopping offsets {@link OffsetsInitializer}.
+ *
+ *
The following {@link OffsetsInitializer} are commonly used and provided out of the box.
+ * Users can also implement their own {@link OffsetsInitializer} for custom behaviors.
+ *
+ *
+ * - {@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when
+ * the KafkaSource starts to run.
+ *
- {@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the
+ * consumer group.
+ *
- {@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each
+ * partition.
+ *
- {@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each
+ * partition. The guarantee of setting the stopping timestamp is that no Kafka records
+ * whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater
+ * than the given stopping timestamp will be consumed. However, it is possible that some
+ * records whose timestamp is smaller than the specified stopping timestamp are not
+ * consumed.
+ *
+ *
+ * @param stoppingOffsetsInitializer The {@link OffsetsInitializer} to specify the stopping
+ * offset.
+ * @return this KafkaSourceBuilder.
+ * @see #setBounded(OffsetsInitializer)
+ */
+ public KafkaSourceBuilder setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) {
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+ return this;
+ }
+
+ /**
+ * By default the KafkaSource is set to run as {@link Boundedness#CONTINUOUS_UNBOUNDED} and thus
+ * never stops until the Flink job fails or is canceled. To let the KafkaSource run as {@link
+ * Boundedness#BOUNDED} and stop at some point, one can set an {@link OffsetsInitializer} to
+ * specify the stopping offsets for each partition. When all the partitions have reached their
+ * stopping offsets, the KafkaSource will then exit.
+ *
+ * This method is different from {@link #setUnbounded(OffsetsInitializer)} in that after
+ * setting the stopping offsets with this method, {@link KafkaSource#getBoundedness()} will
+ * return {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}.
+ *
+ *
The following {@link OffsetsInitializer} are commonly used and provided out of the box.
+ * Users can also implement their own {@link OffsetsInitializer} for custom behaviors.
+ *
+ *
+ * - {@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when
+ * the KafkaSource starts to run.
+ *
- {@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the
+ * consumer group.
+ *
- {@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each
+ * partition.
+ *
- {@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each
+ * partition. The guarantee of setting the stopping timestamp is that no Kafka records
+ * whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater
+ * than the given stopping timestamp will be consumed. However, it is possible that some
+ * records whose timestamp is smaller than the specified stopping timestamp are not
+ * consumed.
+ *
+ *
+ * @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify the stopping
+ * offsets.
+ * @return this KafkaSourceBuilder.
+ * @see #setUnbounded(OffsetsInitializer)
+ */
+ public KafkaSourceBuilder setBounded(OffsetsInitializer stoppingOffsetsInitializer) {
+ this.boundedness = Boundedness.BOUNDED;
+ this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+ return this;
+ }
+
+ /**
+ * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link
+ * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource.
+ *
+ * @param recordDeserializer the deserializer for Kafka {@link
+ * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder setDeserializer(
+ KafkaRecordDeserializationSchema