Skip to content

Commit

Permalink
[INLONG-9081][Sort] Pulsar connector in flink 1.15 should running in …
Browse files Browse the repository at this point in the history
…exclusive mode (apache#9082)
  • Loading branch information
EMsnap authored Oct 20, 2023
1 parent 6936c31 commit 9bd9b4a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT;
import static org.apache.inlong.sort.pulsar.PulsarTableValidationUtils.validatePrimaryKeyConstraints;
Expand Down Expand Up @@ -179,7 +179,7 @@ public Set<ConfigOption<?>> optionalOptions() {
VALUE_FORMAT,
SOURCE_SUBSCRIPTION_NAME,
SOURCE_SUBSCRIPTION_TYPE,
SOURCE_START_FROM_MESSAGE_ID,
STARTUP_MODE,
SOURCE_START_FROM_PUBLISH_TIME,
SOURCE_STOP_AT_MESSAGE_ID,
SOURCE_STOP_AFTER_MESSAGE_ID,
Expand All @@ -203,7 +203,7 @@ public Set<ConfigOption<?>> forwardOptions() {
SERVICE_URL,
SOURCE_SUBSCRIPTION_TYPE,
SOURCE_SUBSCRIPTION_NAME,
SOURCE_START_FROM_MESSAGE_ID,
STARTUP_MODE,
SOURCE_START_FROM_PUBLISH_TIME,
SOURCE_STOP_AT_MESSAGE_ID,
SOURCE_STOP_AFTER_MESSAGE_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT;

Expand Down Expand Up @@ -173,8 +173,8 @@ public static Properties getPulsarPropertiesWithPrefix(
}

public static StartCursor getStartCursor(ReadableConfig tableOptions) {
if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
return parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
if (tableOptions.getOptional(STARTUP_MODE).isPresent()) {
return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE));
} else if (tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
return parsePublishTimeStartCursor(tableOptions.get(SOURCE_START_FROM_PUBLISH_TIME));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private PulsarTableOptions() {
* Copied because we want to have a default value for it.
*/
public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
ConfigOptions.key("source.subscription-name")
ConfigOptions.key("scan.startup.sub-name")
.stringType()
.noDefaultValue()
.withDescription(
Expand All @@ -88,8 +88,8 @@ private PulsarTableOptions() {
"The subscription name of the consumer that is used by the runtime [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). This argument is required for constructing the consumer.")
.build());

public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
ConfigOptions.key("source.start.message-id")
public static final ConfigOption<String> STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()
.noDefaultValue()
.withDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import static org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getValueDecodingFormat;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.pulsar.common.naming.TopicName.isValid;

Expand Down Expand Up @@ -91,12 +91,12 @@ protected static void validateTopicsConfigs(ReadableConfig tableOptions) {
}

protected static void validateStartCursorConfigs(ReadableConfig tableOptions) {
if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()
if (tableOptions.getOptional(STARTUP_MODE).isPresent()
&& tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
throw new ValidationException(
String.format(
"Only one of %s and %s can be specified. Detected both of them",
SOURCE_START_FROM_MESSAGE_ID, SOURCE_START_FROM_PUBLISH_TIME));
STARTUP_MODE, SOURCE_START_FROM_PUBLISH_TIME));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
Expand Down Expand Up @@ -119,7 +118,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
.setUnboundedStopCursor(stopCursor)
.setDeserializationSchema(deserializationSchema)
.setProperties(properties)
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
// only support exclusive since shared mode requires pulsar with transaction enabled
// and supporting transaction consumes more resources in pulsar broker
.setSubscriptionType(SubscriptionType.Exclusive)
.build();
return SourceProvider.of(source);
}
Expand Down

0 comments on commit 9bd9b4a

Please sign in to comment.