Skip to content

Commit

Permalink
Add isolationLevel in consumer side.
Browse files Browse the repository at this point in the history
  • Loading branch information
hzh0425 committed Sep 25, 2023
1 parent 5c9b72a commit e13370d
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,16 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);

/**
* Sets the {@link SubscriptionIsolationLevel} for the consumer.
*
* @param subscriptionIsolationLevel If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed,
* else if READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted.
* Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be configured with the same IsolationLevel.
* @return the consumer builder instance
*/
ConsumerBuilder<T> subscriptionIsolationLevel(SubscriptionIsolationLevel subscriptionIsolationLevel);

/**
* Configure topic specific options to override those set at the {@link ConsumerBuilder} level.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

/**
* When creating a consumer, if the subscription does not exist, a new subscription will be created.
* The default isolation level for Subscription is 'READ_COMMITTED'.
* See {@link #subscriptionIsolationLevel(SubscriptionIsolationLevel)} to configure the isolation level behavior.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public enum SubscriptionIsolationLevel {
// Consumer can only consume all transactional messages which have been committed.
READ_COMMITTED(0),

// Consumer can consume all messages, even transactional messages which have been aborted.
READ_UNCOMMITTED(1);

private final int value;

SubscriptionIsolationLevel(int value) {
this.value = value;
}

public final int getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicConsumerBuilder;
Expand Down Expand Up @@ -539,6 +540,12 @@ public ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled) {
return this;
}

@Override
public ConsumerBuilder<T> subscriptionIsolationLevel(SubscriptionIsolationLevel subscriptionIsolationLevel) {
conf.setSubscriptionIsolationLevel(subscriptionIsolationLevel);
return this;
}

@Override
public TopicConsumerBuilder<T> topicConfiguration(String topicName) {
TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicName(topicName, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
Expand All @@ -99,6 +100,7 @@
import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
import org.apache.pulsar.common.api.proto.CommandMessage;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
import org.apache.pulsar.common.api.proto.KeyValue;
Expand Down Expand Up @@ -206,6 +208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final boolean createTopicIfDoesNotExist;
private final boolean poolMessages;
private final SubscriptionIsolationLevel isolationLevel;

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
Expand Down Expand Up @@ -290,6 +293,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
this.poolMessages = conf.isPoolMessages();
this.paused = conf.isStartPaused();
this.isolationLevel = conf.getSubscriptionIsolationLevel();

if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
Expand Down Expand Up @@ -836,7 +840,8 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
// Use the current epoch to subscribe.
conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this),
IsolationLevel.valueOf(isolationLevel.getValue()));

cnx.sendRequestWithId(request, requestId).thenRun(() -> {
synchronized (ConsumerImpl.this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.SubscriptionIsolationLevel;

@Data
@NoArgsConstructor
Expand Down Expand Up @@ -398,6 +399,14 @@ public int getMaxPendingChuckedMessage() {

private boolean autoScaledReceiverQueueSizeEnabled = false;

@ApiModelProperty(
name = "IsolationLevel",
value = "Consumer can specify subscription IsolationLevel.\n" +
"If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed.\n" +
"If READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted."
)
private SubscriptionIsolationLevel subscriptionIsolationLevel = SubscriptionIsolationLevel.READ_COMMITTED;

private List<TopicConsumerConfigurationData> topicConfigurations = new ArrayList<>();

public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand Down Expand Up @@ -441,7 +442,8 @@ public void testLoadConf() throws Exception {
+ " 'ackReceiptEnabled' : true,\n"
+ " 'poolMessages' : true,\n"
+ " 'startPaused' : true,\n"
+ " 'autoScaledReceiverQueueSizeEnabled' : true\n"
+ " 'autoScaledReceiverQueueSizeEnabled' : true,\n"
+ " 'subscriptionIsolationLevel' : 'READ_UNCOMMITTED'\n"
+ " }").replace("'", "\"");

Map<String, Object> conf = new ObjectMapper().readValue(jsonConf, new TypeReference<HashMap<String,Object>>() {});
Expand Down Expand Up @@ -498,6 +500,7 @@ public void testLoadConf() throws Exception {
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), "new-retry");
assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(), "new-dlq-sub");
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 2);
assertEquals(configurationData.getSubscriptionIsolationLevel(), SubscriptionIsolationLevel.READ_UNCOMMITTED);
assertTrue(configurationData.isRetryEnable());
assertFalse(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
Expand Down Expand Up @@ -573,26 +574,26 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false,
false /* isReplicated */, InitialPosition.Earliest, resetStartMessageBackInSeconds, null,
true /* createTopicIfDoesNotExist */);
true /* createTopicIfDoesNotExist */, IsolationLevel.READ_COMMITTED);
}

public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo,
boolean createTopicIfDoesNotExist) {
boolean createTopicIfDoesNotExist, IsolationLevel isolationLevel) {
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition,
startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null,
Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH);
Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH, isolationLevel);
}

public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec,
SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy,
Map<String, String> subscriptionProperties, long consumerEpoch) {
Map<String, String> subscriptionProperties, long consumerEpoch, IsolationLevel isolationLevel) {
BaseCommand cmd = localCmd(Type.SUBSCRIBE);
CommandSubscribe subscribe = cmd.setSubscribe()
.setTopic(topic)
Expand All @@ -607,7 +608,8 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
.setInitialPosition(subscriptionInitialPosition)
.setReplicateSubscriptionState(isReplicated)
.setForceTopicCreation(createTopicIfDoesNotExist)
.setConsumerEpoch(consumerEpoch);
.setConsumerEpoch(consumerEpoch)
.setIsolationLevel(isolationLevel);

if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) {
List<KeyValue> keyValues = new ArrayList<>();
Expand Down
8 changes: 8 additions & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ message CommandSubscribe {

// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
optional uint64 consumer_epoch = 19;

enum IsolationLevel {
READ_COMMITTED = 0;
READ_UNCOMMITTED = 1;
}
// If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed.
// If READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted.
optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED];
}

message CommandPartitionedTopicMetadata {
Expand Down

0 comments on commit e13370d

Please sign in to comment.