From e13370d747fe7917a53051a93fe321acb4d982aa Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Wed, 20 Sep 2023 23:46:45 +0800 Subject: [PATCH] Add isolationLevel in consumer side. --- .../pulsar/client/api/ConsumerBuilder.java | 10 ++++ .../api/SubscriptionIsolationLevel.java | 47 +++++++++++++++++++ .../client/impl/ConsumerBuilderImpl.java | 7 +++ .../pulsar/client/impl/ConsumerImpl.java | 7 ++- .../impl/conf/ConsumerConfigurationData.java | 9 ++++ .../client/impl/ConsumerBuilderImplTest.java | 5 +- .../pulsar/common/protocol/Commands.java | 12 +++-- pulsar-common/src/main/proto/PulsarApi.proto | 8 ++++ 8 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 01f205a3afde5..a840aff06395a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -831,6 +831,16 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled); + /** + * Sets the {@link SubscriptionIsolationLevel} for the consumer. + * + * @param subscriptionIsolationLevel If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed, + * else if READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted. + * Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be configured with the same IsolationLevel. + * @return the consumer builder instance + */ + ConsumerBuilder subscriptionIsolationLevel(SubscriptionIsolationLevel subscriptionIsolationLevel); + /** * Configure topic specific options to override those set at the {@link ConsumerBuilder} level. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java new file mode 100644 index 0000000000000..2e66a2b66e31b --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * When creating a consumer, if the subscription does not exist, a new subscription will be created. + * The default isolation level for Subscription is 'READ_COMMITTED'. + * See {@link #subscriptionIsolationLevel(SubscriptionIsolationLevel)} to configure the isolation level behavior. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public enum SubscriptionIsolationLevel { + // Consumer can only consume all transactional messages which have been committed. + READ_COMMITTED(0), + + // Consumer can consume all messages, even transactional messages which have been aborted. + READ_UNCOMMITTED(1); + + private final int value; + + SubscriptionIsolationLevel(int value) { + this.value = value; + } + + public final int getValue() { + return value; + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index f644c6a18398f..3ffa39843efd3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicConsumerBuilder; @@ -539,6 +540,12 @@ public ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled) { return this; } + @Override + public ConsumerBuilder subscriptionIsolationLevel(SubscriptionIsolationLevel subscriptionIsolationLevel) { + conf.setSubscriptionIsolationLevel(subscriptionIsolationLevel); + return this; + } + @Override public TopicConsumerBuilder topicConfiguration(String topicName) { TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicName(topicName, conf); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index aa3340c6078ef..04814a59b6f0b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicMessageId; @@ -99,6 +100,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.ValidationError; import org.apache.pulsar.common.api.proto.CommandMessage; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CompressionType; import org.apache.pulsar.common.api.proto.EncryptionKeys; import org.apache.pulsar.common.api.proto.KeyValue; @@ -206,6 +208,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final boolean createTopicIfDoesNotExist; private final boolean poolMessages; + private final SubscriptionIsolationLevel isolationLevel; private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); @@ -290,6 +293,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull(); this.poolMessages = conf.isPoolMessages(); this.paused = conf.isStartPaused(); + this.isolationLevel = conf.getSubscriptionIsolationLevel(); if (client.getConfiguration().getStatsIntervalSeconds() > 0) { stats = new ConsumerStatsRecorderImpl(client, conf, this); @@ -836,7 +840,8 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. - conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)); + conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this), + IsolationLevel.valueOf(isolationLevel.getValue())); cnx.sendRequestWithId(request, requestId).thenRun(() -> { synchronized (ConsumerImpl.this) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 8760926792cd7..3ad7d289e6041 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; @Data @NoArgsConstructor @@ -398,6 +399,14 @@ public int getMaxPendingChuckedMessage() { private boolean autoScaledReceiverQueueSizeEnabled = false; + @ApiModelProperty( + name = "IsolationLevel", + value = "Consumer can specify subscription IsolationLevel.\n" + + "If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed.\n" + + "If READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted." + ) + private SubscriptionIsolationLevel subscriptionIsolationLevel = SubscriptionIsolationLevel.READ_COMMITTED; + private List topicConfigurations = new ArrayList<>(); public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 3fe136630462f..c59988687d30d 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -441,7 +442,8 @@ public void testLoadConf() throws Exception { + " 'ackReceiptEnabled' : true,\n" + " 'poolMessages' : true,\n" + " 'startPaused' : true,\n" - + " 'autoScaledReceiverQueueSizeEnabled' : true\n" + + " 'autoScaledReceiverQueueSizeEnabled' : true,\n" + + " 'subscriptionIsolationLevel' : 'READ_UNCOMMITTED'\n" + " }").replace("'", "\""); Map conf = new ObjectMapper().readValue(jsonConf, new TypeReference>() {}); @@ -498,6 +500,7 @@ public void testLoadConf() throws Exception { assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), "new-retry"); assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(), "new-dlq-sub"); assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 2); + assertEquals(configurationData.getSubscriptionIsolationLevel(), SubscriptionIsolationLevel.READ_UNCOMMITTED); assertTrue(configurationData.isRetryEnable()); assertFalse(configurationData.isAutoUpdatePartitions()); assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index cf0cd820a6d10..86d18c69eb417 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -84,6 +84,7 @@ import org.apache.pulsar.common.api.proto.CommandSend; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; @@ -573,18 +574,18 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, false /* isReplicated */, InitialPosition.Earliest, resetStartMessageBackInSeconds, null, - true /* createTopicIfDoesNotExist */); + true /* createTopicIfDoesNotExist */, IsolationLevel.READ_COMMITTED); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, Map metadata, boolean readCompacted, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, - boolean createTopicIfDoesNotExist) { + boolean createTopicIfDoesNotExist, IsolationLevel isolationLevel) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, - Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH); + Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH, isolationLevel); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, @@ -592,7 +593,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu Map metadata, boolean readCompacted, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, - Map subscriptionProperties, long consumerEpoch) { + Map subscriptionProperties, long consumerEpoch, IsolationLevel isolationLevel) { BaseCommand cmd = localCmd(Type.SUBSCRIBE); CommandSubscribe subscribe = cmd.setSubscribe() .setTopic(topic) @@ -607,7 +608,8 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setInitialPosition(subscriptionInitialPosition) .setReplicateSubscriptionState(isReplicated) .setForceTopicCreation(createTopicIfDoesNotExist) - .setConsumerEpoch(consumerEpoch); + .setConsumerEpoch(consumerEpoch) + .setIsolationLevel(isolationLevel); if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) { List keyValues = new ArrayList<>(); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index afe193eeb7e9d..c3a7641fe0a4b 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -399,6 +399,14 @@ message CommandSubscribe { // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch optional uint64 consumer_epoch = 19; + + enum IsolationLevel { + READ_COMMITTED = 0; + READ_UNCOMMITTED = 1; + } + // If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed. + // If READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted. + optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED]; } message CommandPartitionedTopicMetadata {