Skip to content

Commit

Permalink
[INLONG-8029][Sort] Use fixed subscription name when start a pulsar r…
Browse files Browse the repository at this point in the history
…eader (#8033)
  • Loading branch information
EMsnap authored May 16, 2023
1 parent af6c6c0 commit 3963987
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 9 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
| [INLONG-7769](https://github.com/apache/inlong/issues/7769) | [Bug][DataProxy] NPE when request Inlong Manager failed |
| [INLONG-7512](https://github.com/apache/inlong/issues/7512) | [Improve][DataProxy] Update the metrics log level to avoid the log file increasing quickly |
| [INLONG-7766](https://github.com/apache/inlong/issues/7766) | [Bug][DataProxySDK] Adjusted frame length exceeds occurred when reporting data through the HTTP protocol |
| [INLONG-7194](https://github.com/apache/inlong/issues/7194) | [Improve][DataProxy] Migrate index log statistics for the new mq layer |

### TubeMQ
| ISSUE | Summary |
Expand Down Expand Up @@ -155,6 +154,7 @@
| [INLONG-7970](https://github.com/apache/inlong/issues/7970) | [Bug][Sort] java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.table.data.StringData |
| [INLONG-7747](https://github.com/apache/inlong/issues/7747) | [Umbrella][Sort] Improve memory stability of data ingesting into iceberg |
| [INLONG-6545](https://github.com/apache/inlong/issues/6545) | [Improve][Sort] Accurately parse the schema type and completely match the missing precision information |
| [INLONG-8029](https://github.com/apache/inlong/issues/8029) | [Improve][Sort] Use fixed subscription name when start a pulsar reader |

### Audit
| ISSUE | Summary |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ public void onException(Throwable cause) {
getRuntimeContext().getUserCodeClassLoader(),
streamingRuntime,
useMetrics,
excludeStartMessageIds);
excludeStartMessageIds,
getSubscriptionName());

if (!running) {
return;
Expand All @@ -642,7 +643,8 @@ protected PulsarFetcher<T> createFetcher(
ClassLoader userCodeClassLoader,
StreamingRuntimeContext streamingRuntime,
boolean useMetrics,
Set<TopicRange> excludeStartMessageIds) throws Exception {
Set<TopicRange> excludeStartMessageIds,
String subscriptionName) throws Exception {

// readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, getSubscriptionName());

Expand All @@ -657,6 +659,7 @@ protected PulsarFetcher<T> createFetcher(
streamingRuntime,
clientConfigurationData,
readerConf,
subscriptionName,
pollTimeoutMs,
commitMaxRetries,
deserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ protected PulsarFetcher<T> createFetcher(
streamingRuntime,
clientConfigurationData,
readerConf,
getSubscriptionName(),
pollTimeoutMs,
commitMaxRetries,
deserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class PulsarFetcher<T> {

protected final Map<String, Object> readerConf;

protected final String subscriptionName;

protected final PulsarDeserializationSchema<T> deserializer;

protected final int pollTimeoutMs;
Expand Down Expand Up @@ -181,7 +183,8 @@ public PulsarFetcher(
PulsarDeserializationSchema<T> deserializer,
PulsarMetadataReader metadataReader,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
boolean useMetrics,
String subscriptionName) throws Exception {
this(
sourceContext,
seedTopicsWithInitialOffsets,
Expand All @@ -193,6 +196,7 @@ public PulsarFetcher(
runtimeContext,
clientConf,
readerConf,
subscriptionName,
pollTimeoutMs,
3, // commit retries before fail
deserializer,
Expand All @@ -212,6 +216,7 @@ public PulsarFetcher(
StreamingRuntimeContext runtimeContext,
ClientConfigurationData clientConf,
Map<String, Object> readerConf,
String subscriptionName,
int pollTimeoutMs,
int commitMaxRetries,
PulsarDeserializationSchema<T> deserializer,
Expand All @@ -221,6 +226,7 @@ public PulsarFetcher(

this.sourceContext = sourceContext;
this.watermarkOutput = new SourceContextWatermarkOutputAdapter<>(sourceContext);
this.subscriptionName = subscriptionName;
this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(watermarkOutput);
this.useMetrics = useMetrics;
this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
Expand Down Expand Up @@ -525,7 +531,7 @@ protected ReaderThread<T> createReaderThread(ExceptionProxy exceptionProxy, Puls
exceptionProxy,
failOnDataLoss,
useEarliestWhenDataLoss,
excludeStartMessageIds.contains(state.getTopicRange()));
excludeStartMessageIds.contains(state.getTopicRange()), subscriptionName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class ReaderThread<T> extends Thread {
private boolean failOnDataLoss = true;
private boolean useEarliestWhenDataLoss = false;
private final PulsarCollector pulsarCollector;
private final String subscriptionName;

protected volatile boolean running = true;
protected volatile boolean closed = false;
Expand All @@ -78,17 +79,18 @@ public ReaderThread(
Map<String, Object> readerConf,
PulsarDeserializationSchema<T> deserializer,
int pollTimeoutMs,
ExceptionProxy exceptionProxy) {
ExceptionProxy exceptionProxy,
String subscriptionName) {
this.owner = owner;
this.state = state;
this.clientConf = clientConf;
this.readerConf = readerConf;
this.deserializer = deserializer;
this.pollTimeoutMs = pollTimeoutMs;
this.exceptionProxy = exceptionProxy;

this.topicRange = state.getTopicRange();
this.startMessageId = state.getOffset();
this.subscriptionName = subscriptionName;
this.pulsarCollector = new PulsarCollector();
}

Expand All @@ -102,8 +104,10 @@ public ReaderThread(
ExceptionProxy exceptionProxy,
boolean failOnDataLoss,
boolean useEarliestWhenDataLoss,
boolean excludeMessageId) {
this(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy);
boolean excludeMessageId,
String subscriptionName) {
this(owner, state, clientConf, readerConf, deserializer, pollTimeoutMs, exceptionProxy,
subscriptionName);
this.failOnDataLoss = failOnDataLoss;
this.useEarliestWhenDataLoss = useEarliestWhenDataLoss;
this.excludeMessageId = excludeMessageId;
Expand Down Expand Up @@ -141,6 +145,7 @@ protected void createActualReader() throws PulsarClientException {
.newReader(deserializer.getSchema())
.topic(topicRange.getTopic())
.startMessageId(startMessageId)
.subscriptionName(subscriptionName)
.loadConf(readerConf);
log.info("Create a reader at topic {} starting from message {} (inclusive) : config = {}",
topicRange, startMessageId, readerConf);
Expand Down

0 comments on commit 3963987

Please sign in to comment.