From e13370d747fe7917a53051a93fe321acb4d982aa Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Wed, 20 Sep 2023 23:46:45 +0800 Subject: [PATCH 1/4] Add isolationLevel in consumer side. --- .../pulsar/client/api/ConsumerBuilder.java | 10 ++++ .../api/SubscriptionIsolationLevel.java | 47 +++++++++++++++++++ .../client/impl/ConsumerBuilderImpl.java | 7 +++ .../pulsar/client/impl/ConsumerImpl.java | 7 ++- .../impl/conf/ConsumerConfigurationData.java | 9 ++++ .../client/impl/ConsumerBuilderImplTest.java | 5 +- .../pulsar/common/protocol/Commands.java | 12 +++-- pulsar-common/src/main/proto/PulsarApi.proto | 8 ++++ 8 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 01f205a3afde5..a840aff06395a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -831,6 +831,16 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled); + /** + * Sets the {@link SubscriptionIsolationLevel} for the consumer. + * + * @param subscriptionIsolationLevel If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed, + * else if READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted. + * Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be configured with the same IsolationLevel. + * @return the consumer builder instance + */ + ConsumerBuilder subscriptionIsolationLevel(SubscriptionIsolationLevel subscriptionIsolationLevel); + /** * Configure topic specific options to override those set at the {@link ConsumerBuilder} level. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java new file mode 100644 index 0000000000000..2e66a2b66e31b --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * When creating a consumer, if the subscription does not exist, a new subscription will be created. + * The default isolation level for Subscription is 'READ_COMMITTED'. + * See {@link #subscriptionIsolationLevel(SubscriptionIsolationLevel)} to configure the isolation level behavior. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public enum SubscriptionIsolationLevel { + // Consumer can only consume all transactional messages which have been committed. + READ_COMMITTED(0), + + // Consumer can consume all messages, even transactional messages which have been aborted. + READ_UNCOMMITTED(1); + + private final int value; + + SubscriptionIsolationLevel(int value) { + this.value = value; + } + + public final int getValue() { + return value; + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index f644c6a18398f..3ffa39843efd3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicConsumerBuilder; @@ -539,6 +540,12 @@ public ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled) { return this; } + @Override + public ConsumerBuilder subscriptionIsolationLevel(SubscriptionIsolationLevel subscriptionIsolationLevel) { + conf.setSubscriptionIsolationLevel(subscriptionIsolationLevel); + return this; + } + @Override public TopicConsumerBuilder topicConfiguration(String topicName) { TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.ofTopicName(topicName, conf); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index aa3340c6078ef..04814a59b6f0b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicMessageId; @@ -99,6 +100,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.ValidationError; import org.apache.pulsar.common.api.proto.CommandMessage; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CompressionType; import org.apache.pulsar.common.api.proto.EncryptionKeys; import org.apache.pulsar.common.api.proto.KeyValue; @@ -206,6 +208,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final boolean createTopicIfDoesNotExist; private final boolean poolMessages; + private final SubscriptionIsolationLevel isolationLevel; private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); @@ -290,6 +293,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull(); this.poolMessages = conf.isPoolMessages(); this.paused = conf.isStartPaused(); + this.isolationLevel = conf.getSubscriptionIsolationLevel(); if (client.getConfiguration().getStatsIntervalSeconds() > 0) { stats = new ConsumerStatsRecorderImpl(client, conf, this); @@ -836,7 +840,8 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. - conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)); + conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this), + IsolationLevel.valueOf(isolationLevel.getValue())); cnx.sendRequestWithId(request, requestId).thenRun(() -> { synchronized (ConsumerImpl.this) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 8760926792cd7..3ad7d289e6041 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; @Data @NoArgsConstructor @@ -398,6 +399,14 @@ public int getMaxPendingChuckedMessage() { private boolean autoScaledReceiverQueueSizeEnabled = false; + @ApiModelProperty( + name = "IsolationLevel", + value = "Consumer can specify subscription IsolationLevel.\n" + + "If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed.\n" + + "If READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted." + ) + private SubscriptionIsolationLevel subscriptionIsolationLevel = SubscriptionIsolationLevel.READ_COMMITTED; + private List topicConfigurations = new ArrayList<>(); public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 3fe136630462f..c59988687d30d 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -441,7 +442,8 @@ public void testLoadConf() throws Exception { + " 'ackReceiptEnabled' : true,\n" + " 'poolMessages' : true,\n" + " 'startPaused' : true,\n" - + " 'autoScaledReceiverQueueSizeEnabled' : true\n" + + " 'autoScaledReceiverQueueSizeEnabled' : true,\n" + + " 'subscriptionIsolationLevel' : 'READ_UNCOMMITTED'\n" + " }").replace("'", "\""); Map conf = new ObjectMapper().readValue(jsonConf, new TypeReference>() {}); @@ -498,6 +500,7 @@ public void testLoadConf() throws Exception { assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), "new-retry"); assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(), "new-dlq-sub"); assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 2); + assertEquals(configurationData.getSubscriptionIsolationLevel(), SubscriptionIsolationLevel.READ_UNCOMMITTED); assertTrue(configurationData.isRetryEnable()); assertFalse(configurationData.isAutoUpdatePartitions()); assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index cf0cd820a6d10..86d18c69eb417 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -84,6 +84,7 @@ import org.apache.pulsar.common.api.proto.CommandSend; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; @@ -573,18 +574,18 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, false /* isReplicated */, InitialPosition.Earliest, resetStartMessageBackInSeconds, null, - true /* createTopicIfDoesNotExist */); + true /* createTopicIfDoesNotExist */, IsolationLevel.READ_COMMITTED); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, Map metadata, boolean readCompacted, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, - boolean createTopicIfDoesNotExist) { + boolean createTopicIfDoesNotExist, IsolationLevel isolationLevel) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, - Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH); + Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH, isolationLevel); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, @@ -592,7 +593,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu Map metadata, boolean readCompacted, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, - Map subscriptionProperties, long consumerEpoch) { + Map subscriptionProperties, long consumerEpoch, IsolationLevel isolationLevel) { BaseCommand cmd = localCmd(Type.SUBSCRIBE); CommandSubscribe subscribe = cmd.setSubscribe() .setTopic(topic) @@ -607,7 +608,8 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setInitialPosition(subscriptionInitialPosition) .setReplicateSubscriptionState(isReplicated) .setForceTopicCreation(createTopicIfDoesNotExist) - .setConsumerEpoch(consumerEpoch); + .setConsumerEpoch(consumerEpoch) + .setIsolationLevel(isolationLevel); if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) { List keyValues = new ArrayList<>(); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index afe193eeb7e9d..c3a7641fe0a4b 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -399,6 +399,14 @@ message CommandSubscribe { // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch optional uint64 consumer_epoch = 19; + + enum IsolationLevel { + READ_COMMITTED = 0; + READ_UNCOMMITTED = 1; + } + // If READ_COMMITTED is selected, the Consumer can only consume all transactional messages which have been committed. + // If READ_UNCOMMITTED is selected, the Consumer can consume all messages, even transactional messages which have been aborted. + optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED]; } message CommandPartitionedTopicMetadata { From 05a5c46233ee135fbe7188f22535e966ef78ff40 Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Sun, 24 Sep 2023 21:16:30 +0800 Subject: [PATCH 2/4] Add isolationLevel logic in broker side. --- .../broker/service/AbstractSubscription.java | 23 +++++ .../pulsar/broker/service/ServerCnx.java | 3 + .../pulsar/broker/service/Subscription.java | 3 + .../broker/service/SubscriptionOption.java | 1 + .../apache/pulsar/broker/service/Topic.java | 4 +- .../NonPersistentSubscription.java | 9 ++ .../nonpersistent/NonPersistentTopic.java | 13 ++- ...PersistentDispatcherMultipleConsumers.java | 11 ++- ...sistentDispatcherSingleActiveConsumer.java | 8 +- .../persistent/PersistentSubscription.java | 8 ++ .../service/persistent/PersistentTopic.java | 15 +++- .../service/AbstractSubscriptionTest.java | 15 ++++ .../pulsar/broker/service/ServerCnxTest.java | 28 +++++++ .../transaction/TransactionConsumeTest.java | 84 +++++++++++++++++++ .../pulsar/common/protocol/Commands.java | 19 +++-- 15 files changed, 227 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java index 10e6b79609721..f1b175c3b7f4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java @@ -18,14 +18,37 @@ */ package org.apache.pulsar.broker.service; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; + +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.LongAdder; import java.util.function.ToLongFunction; public abstract class AbstractSubscription implements Subscription { + protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = "pulsar.subscription.isolation.level"; protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); protected final LongAdder msgOutFromRemovedConsumer = new LongAdder(); + public static void wrapIsolationLevelToProperties(Map properties, IsolationLevel isolationLevel) { + if (properties != null) { + properties.put(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY, String.valueOf(isolationLevel.getValue())); + } + } + + public IsolationLevel fetchIsolationLevelFromProperties(Map properties) { + if (properties == null) { + return IsolationLevel.READ_COMMITTED; + } + + if (properties.containsKey(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)) { + IsolationLevel isolationLevel = IsolationLevel.valueOf(Integer.parseInt(properties.get(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY))); + return isolationLevel != null ? isolationLevel : IsolationLevel.READ_COMMITTED; + } else { + return IsolationLevel.READ_COMMITTED; + } + } + public long getMsgOutCounter() { return msgOutFromRemovedConsumer.longValue() + sumConsumers(Consumer::getMsgOutCounter); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 0517fff0f03f5..d04bd5dad5596 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -124,6 +124,7 @@ import org.apache.pulsar.common.api.proto.CommandSend; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; @@ -1145,6 +1146,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) : emptyKeySharedMeta; final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH; + final IsolationLevel isolationLevel = subscribe.getIsolationLevel(); final Optional> subscriptionProperties = SubscriptionOption.getPropertiesMap( subscribe.getSubscriptionPropertiesList()); @@ -1251,6 +1253,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .subscriptionProperties(subscriptionProperties) .consumerEpoch(consumerEpoch) .schemaType(schema == null ? null : schema.getType()) + .isolationLevel(isolationLevel) .build(); if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) { return topic.addSchemaIfIdleOrCheckCompatible(schema) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index be079c2b4b5d3..d741cf08d72db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; @@ -96,6 +97,8 @@ default long getNumberOfEntriesDelayed() { String getTypeString(); + IsolationLevel getIsolationLevel(); + void addUnAckedMessages(int unAckMessages); Map getSubscriptionProperties(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index af56d023616b4..045e7a6d0c11a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -45,6 +45,7 @@ public class SubscriptionOption { private Map metadata; private boolean readCompacted; private CommandSubscribe.InitialPosition initialPosition; + private CommandSubscribe.IsolationLevel isolationLevel; private long startMessageRollbackDurationSec; private boolean replicatedSubscriptionStateArg; private KeySharedMeta keySharedMeta; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 7657d77e1299f..09c9bb07a15a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -170,7 +171,8 @@ CompletableFuture subscribe(TransportCnx cnx, String subscriptionName, Map metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicateSubscriptionState, - KeySharedMeta keySharedMeta); + KeySharedMeta keySharedMeta, + IsolationLevel isolationLevel); /** * Subscribe a topic. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 7cd4d8984c8f4..376fd0288a6d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -41,6 +41,7 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; @@ -70,6 +71,8 @@ public class NonPersistentSubscription extends AbstractSubscription implements S private KeySharedMode keySharedMode = null; + private final IsolationLevel isolationLevel; + public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, Map properties) { this.topic = topic; @@ -79,6 +82,7 @@ public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionNa IS_FENCED_UPDATER.set(this, FALSE); this.subscriptionProperties = properties != null ? Collections.unmodifiableMap(properties) : Collections.emptyMap(); + this.isolationLevel = fetchIsolationLevelFromProperties(properties); } @Override @@ -236,6 +240,11 @@ public String getTypeString() { return "Null"; } + @Override + public IsolationLevel getIsolationLevel() { + return this.isolationLevel; + } + @Override public CompletableFuture clearBacklog() { // No-op diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 639d2cfc5810f..03698bc8543c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -43,6 +43,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.AbstractReplicator; +import org.apache.pulsar.broker.service.AbstractSubscription; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -69,6 +70,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.naming.TopicName; @@ -263,7 +265,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null), - option.getSchemaType()); + option.getSchemaType(), option.getIsolationLevel()); } @Override @@ -273,10 +275,11 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs Map metadata, boolean readCompacted, InitialPosition initialPosition, long resetStartMessageBackInSec, boolean replicateSubscriptionState, - KeySharedMeta keySharedMeta) { + KeySharedMeta keySharedMeta, + IsolationLevel isolationLevel) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, startMessageId, metadata, readCompacted, resetStartMessageBackInSec, - replicateSubscriptionState, keySharedMeta, null, null); + replicateSubscriptionState, keySharedMeta, null, null, isolationLevel); } private CompletableFuture internalSubscribe(final TransportCnx cnx, String subscriptionName, @@ -287,7 +290,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St boolean replicateSubscriptionState, KeySharedMeta keySharedMeta, Map subscriptionProperties, - SchemaType schemaType) { + SchemaType schemaType, + IsolationLevel isolationLevel) { return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { final CompletableFuture future = new CompletableFuture<>(); @@ -325,6 +329,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St lock.readLock().unlock(); } + AbstractSubscription.wrapIsolationLevelToProperties(subscriptionProperties, isolationLevel); NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> new NonPersistentSubscription(this, subscriptionName, subscriptionProperties)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index b3d48252efe58..38ef5feffbf33 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -69,6 +69,7 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException; import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.stats.TopicMetricBean; @@ -330,6 +331,7 @@ public synchronized void readMoreEntries() { minReplayedPosition = null; } + PositionImpl maxReadPosition = getMaxReadPosition(); // Filter out and skip read delayed messages exist in DelayedDeliveryTracker if (delayedDeliveryTracker.isPresent()) { Predicate skipCondition = null; @@ -339,10 +341,10 @@ public synchronized void readMoreEntries() { .containsMessage(position.getLedgerId(), position.getEntryId()); } cursor.asyncReadEntriesWithSkipOrWait(messagesToRead, bytesToRead, this, ReadType.Normal, - topic.getMaxReadPosition(), skipCondition); + maxReadPosition, skipCondition); } else { cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal, - topic.getMaxReadPosition()); + maxReadPosition); } } else { log.debug("[{}] Cannot schedule next read until previous one is done", name); @@ -354,6 +356,11 @@ public synchronized void readMoreEntries() { } } + private PositionImpl getMaxReadPosition() { + return subscription.getIsolationLevel() == IsolationLevel.READ_COMMITTED ? + topic.getMaxReadPosition() : PositionImpl.LATEST; + } + @Override protected void reScheduleRead() { if (isRescheduleReadInProgress.compareAndSet(false, true)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index d96429693fda8..e0511ae9219ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -52,6 +52,7 @@ import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.compaction.CompactedTopicUtils; @@ -356,7 +357,7 @@ protected void readMoreEntries(Consumer consumer) { ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch()); cursor.asyncReadEntriesOrWait(messagesToRead, - bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition()); + bytesToRead, this, readEntriesCtx, getMaxReadPosition()); } } } else { @@ -366,6 +367,11 @@ protected void readMoreEntries(Consumer consumer) { } } + private PositionImpl getMaxReadPosition() { + return subscription.getIsolationLevel() == IsolationLevel.READ_COMMITTED ? + topic.getMaxReadPosition() : PositionImpl.LATEST; + } + @Override protected void reScheduleRead() { if (isRescheduleReadInProgress.compareAndSet(false, true)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1f6f688d86fe1..fa451008acff0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -77,6 +77,7 @@ import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -127,6 +128,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; + private final IsolationLevel subscriptionIsolationLevel; private volatile CompletableFuture fenceFuture; static Map getBaseCursorProperties(boolean isReplicated) { @@ -160,6 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.pendingAckHandle = new PendingAckHandleDisabled(); } IS_FENCED_UPDATER.set(this, FALSE); + this.subscriptionIsolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties); } public void updateLastMarkDeleteAdvancedTimestamp() { @@ -506,6 +509,11 @@ public String getTypeString() { return "Null"; } + @Override + public IsolationLevel getIsolationLevel() { + return subscriptionIsolationLevel; + } + @Override public CompletableFuture analyzeBacklog(Optional position) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dfeb03a254698..802d7463c2f5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -88,6 +88,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; import org.apache.pulsar.broker.service.AbstractReplicator; +import org.apache.pulsar.broker.service.AbstractSubscription; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -133,6 +134,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -797,7 +799,8 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), - option.getConsumerEpoch(), option.getSchemaType()); + option.getConsumerEpoch(), option.getSchemaType(), + option.getIsolationLevel()); } private CompletableFuture internalSubscribe(final TransportCnx cnx, String subscriptionName, @@ -811,7 +814,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch, - SchemaType schemaType) { + SchemaType schemaType, + IsolationLevel isolationLevel) { if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) { return FutureUtil.failedFuture(new NotAllowedException( "readCompacted only allowed on failover or exclusive subscriptions")); @@ -890,6 +894,8 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St lock.readLock().unlock(); } + + AbstractSubscription.wrapIsolationLevelToProperties(subscriptionProperties, isolationLevel); CompletableFuture subscriptionFuture = isDurable ? // getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState, subscriptionProperties) @@ -975,10 +981,11 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicatedSubscriptionStateArg, - KeySharedMeta keySharedMeta) { + KeySharedMeta keySharedMeta, + IsolationLevel isolationLevel) { return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null); + replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, isolationLevel); } private CompletableFuture getDurableSubscription(String subscriptionName, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java index 82554c74ef7b7..b075ca1ee2464 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java @@ -22,7 +22,13 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -54,4 +60,13 @@ public void testGetBytesOutCounter() { when(consumer.getBytesOutCounter()).thenReturn(2L); assertEquals(subscription.getBytesOutCounter(), 3L); } + + @Test + public void testWrapAndFetchIsolationLevelInProperties() { + Map properties = new HashMap<>(1); + AbstractSubscription.wrapIsolationLevelToProperties(properties, IsolationLevel.READ_UNCOMMITTED); + assertTrue(properties.containsKey(AbstractSubscription.SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)); + assertEquals(Integer.valueOf(properties.get(AbstractSubscription.SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)), + IsolationLevel.READ_UNCOMMITTED.getValue()); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 5fd4881981365..f5ae4cf41ff86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -124,6 +124,7 @@ import org.apache.pulsar.common.api.proto.CommandSendReceipt; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandSuccess; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; @@ -2295,6 +2296,33 @@ public void testSubscribeCommand() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void testSubscribeCommandWithIsolationLevel() throws Exception { + resetChannel(); + setChannelConnected(); + svcConfig.setAuthenticationEnabled(false); + svcConfig.setAuthorizationEnabled(false); + // test SUBSCRIBE on topic and cursor creation success with 'ReadUnCommitted' isolation level. + ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // + successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, + "test" /* consumer name */, 0 /* avoid reseting cursor */, IsolationLevel.READ_UNCOMMITTED); + channel.writeInbound(clientCommand); + assertTrue(getResponse() instanceof CommandSuccess); + + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); + + assertNotNull(topicRef); + assertTrue(topicRef.getSubscriptions().containsKey(successSubName)); + assertTrue(topicRef.getSubscription(successSubName).getDispatcher().isConsumerConnected()); + assertEquals(topicRef.getSubscription(successSubName).getIsolationLevel(), IsolationLevel.READ_UNCOMMITTED); + + // Server will not close the connection + assertTrue(channel.isOpen()); + + channel.finish(); + } + + @Test(timeOut = 30000) public void testUnsupportedBatchMsgSubscribeCommand() throws Exception { final String failSubName = "failSub"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java index 0e17719aca7e7..b17fdb1c3ac6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.MessageIdData; @@ -228,6 +229,89 @@ public void sortedTest() throws Exception { log.info("TransactionConsumeTest sortedTest finish."); } + @Test + public void testConsumeMessageWithDifferentIsolationLevel() throws Exception { + int messageCntBeforeTxn = 10; + int transactionMessageCnt = 10; + int messageCntAfterTxn = 10; + int totalMsgCnt = messageCntBeforeTxn + transactionMessageCnt + messageCntAfterTxn; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(CONSUME_TOPIC) + .create(); + + @Cleanup + Consumer consumerWithReadCommittedIsolationLevel = pulsarClient.newConsumer() + .topic(CONSUME_TOPIC) + .subscriptionName("read-committed-isolation-test") + .subscribe(); + + @Cleanup + Consumer consumerWithReadUnCommittedIsolationLevel = pulsarClient.newConsumer() + .topic(CONSUME_TOPIC) + .subscriptionName("read-unCommitted-isolation-test") + .subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_UNCOMMITTED) + .subscribe(); + + Awaitility.await().until(consumerWithReadUnCommittedIsolationLevel::isConnected); + + long mostSigBits = 2L; + long leastSigBits = 5L; + TxnID txnID = new TxnID(mostSigBits, leastSigBits); + + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() + .getTopic(CONSUME_TOPIC, false).get().get(); + log.info("transactionBuffer init finish."); + + List sendMessageList = new ArrayList<>(); + sendNormalMessages(producer, 0, messageCntBeforeTxn, sendMessageList); + appendTransactionMessages(txnID, persistentTopic, transactionMessageCnt, sendMessageList); + sendNormalMessages(producer, messageCntBeforeTxn, messageCntAfterTxn, sendMessageList); + + Message message; + + for (int i = 0; i < totalMsgCnt; i++) { + // 1. for consumer 'consumerWithReadUnCommittedIsolationLevel', Because the transaction isolation level is ReadUncommitted, all messages can be read + message = consumerWithReadUnCommittedIsolationLevel.receive(500, TimeUnit.MILLISECONDS); + Assert.assertNotNull(message); + if (i < messageCntBeforeTxn) { + log.info("Consumer with ReadUnCommittedIsolationLevel Receive normal msg: {}" + new String(message.getData(), UTF_8)); + } else { + if (i < messageCntBeforeTxn + transactionMessageCnt) { + log.info("Consumer with ReadUnCommittedIsolationLevel Receive txn id: {}, msg: {}", message.getMessageId(), new String(message.getData())); + } else { + log.info("Consumer with ReadUnCommittedIsolationLevel Receive normal msg: {}" + new String(message.getData(), UTF_8)); + } + } + + // 2. for consumer 'consumerWithReadCommittedIsolationLevel', Because the transaction isolation level is ReadUnCommitted, + // it can only read 'messageCntBeforeTxn' messages before the transaction is committed + if (i < messageCntBeforeTxn) { + message = consumerWithReadCommittedIsolationLevel.receive(500, TimeUnit.MILLISECONDS); + Assert.assertNotNull(message); + log.info("Consumer with ReadCommittedIsolationLevel Receive normal msg: {}" + new String(message.getData(), UTF_8)); + } else { + message = consumerWithReadCommittedIsolationLevel.receive(500, TimeUnit.MILLISECONDS); + Assert.assertNull(message); + log.info("Consumer with ReadCommittedIsolationLevel can't receive message before commit."); + } + } + + // Now commit the transaction + persistentTopic.endTxn(txnID, TxnAction.COMMIT_VALUE, 0L).get(); + log.info("Commit txn."); + + // 'consumerWithReadCommittedIsolationLevel' receive transaction messages successfully after commit + for (int i = 0; i < transactionMessageCnt + messageCntAfterTxn; i++) { + message = consumerWithReadCommittedIsolationLevel.receive(5, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Consumer with ReadCommittedIsolationLevel Receive txn id: {}, msg: {}", message.getMessageId(), new String(message.getData())); + } + + log.info("Consume message with different isolation level test finish."); + } + @Test public void testMessageRedelivery() throws Exception { int transactionMessageCnt = 10; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 86d18c69eb417..4c851913e127a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -570,7 +570,7 @@ public static ByteBufPair newSend(long producerId, long sequenceId, long highest } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, - SubType subType, int priorityLevel, String consumerName, long resetStartMessageBackInSeconds) { + SubType subType, int priorityLevel, String consumerName, long resetStartMessageBackInSeconds) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, false /* isReplicated */, InitialPosition.Earliest, resetStartMessageBackInSeconds, null, @@ -578,10 +578,19 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, - SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, - InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, - boolean createTopicIfDoesNotExist, IsolationLevel isolationLevel) { + SubType subType, int priorityLevel, String consumerName, long resetStartMessageBackInSeconds, + IsolationLevel isolationLevel) { + return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, + true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, + false /* isReplicated */, InitialPosition.Earliest, resetStartMessageBackInSeconds, null, + true /* createTopicIfDoesNotExist */, isolationLevel); + } + + public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, + SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, + Map metadata, boolean readCompacted, boolean isReplicated, + InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, + boolean createTopicIfDoesNotExist, IsolationLevel isolationLevel) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, From 7eee90c950b08b4952936a51c5632f3d1810b670 Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Sat, 7 Oct 2023 21:39:17 +0800 Subject: [PATCH 3/4] Add stats 'subscriptionIsolationLevel' in SubscriptionStatsImpl --- .../NonPersistentSubscription.java | 1 + .../persistent/PersistentSubscription.java | 7 ++++--- .../pulsar/broker/admin/AdminApiTest.java | 20 +++++++++++++++++++ .../policies/data/SubscriptionStats.java | 3 +++ .../data/stats/SubscriptionStatsImpl.java | 7 +++++++ 5 files changed, 35 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 376fd0288a6d4..56eea830fad3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -472,6 +472,7 @@ public NonPersistentSubscriptionStatsImpl getStats() { subStats.type = getTypeString(); subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate(); + subStats.subscriptionIsolationLevel = this.isolationLevel.toString(); KeySharedMode keySharedMode = this.keySharedMode; if (getType() == SubType.Key_Shared && keySharedMode != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index fa451008acff0..91ad5411ae592 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -128,7 +128,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; private volatile Map subscriptionProperties; - private final IsolationLevel subscriptionIsolationLevel; + private final IsolationLevel isolationLevel; private volatile CompletableFuture fenceFuture; static Map getBaseCursorProperties(boolean isReplicated) { @@ -162,7 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.pendingAckHandle = new PendingAckHandleDisabled(); } IS_FENCED_UPDATER.set(this, FALSE); - this.subscriptionIsolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties); + this.isolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties); } public void updateLastMarkDeleteAdvancedTimestamp() { @@ -511,7 +511,7 @@ public String getTypeString() { @Override public IsolationLevel getIsolationLevel() { - return subscriptionIsolationLevel; + return isolationLevel; } @Override @@ -1195,6 +1195,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri subStats.isReplicated = isReplicated(); subStats.subscriptionProperties = subscriptionProperties; subStats.isDurable = cursor.isDurable(); + subStats.subscriptionIsolationLevel = this.isolationLevel.toString(); if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) { PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) dispatcher; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index e97707710d743..0dd886983408e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -99,6 +99,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionIsolationLevel; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.lookup.data.LookupData; @@ -1331,6 +1332,25 @@ public void testGetPartitionedStatsContainSubscriptionType() throws Exception { assertEquals(topicStats.getSubscriptions().get(subName).getType(), SubscriptionType.Exclusive.toString()); } + @Test + public void testGetPartitionedStatsContainSubscriptionIsolationLevel() throws Exception { + final String topic = "persistent://prop-xyz/ns1/my-topic" + UUID.randomUUID(); + final int numPartitions = 4; + admin.topics().createPartitionedTopic(topic, numPartitions); + + // create consumer and subscription + final String subName = "my-sub"; + @Cleanup Consumer exclusiveConsumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionIsolationLevel(SubscriptionIsolationLevel.READ_UNCOMMITTED) + .subscribe(); + + TopicStats topicStats = admin.topics().getPartitionedStats(topic, false); + assertEquals(topicStats.getSubscriptions().size(), 1); + assertEquals(topicStats.getSubscriptions().get(subName).getSubscriptionIsolationLevel(), SubscriptionIsolationLevel.READ_UNCOMMITTED.toString()); + } + @Test public void testGetPartitionedStatsInternal() throws Exception { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 9ff94a2952ea3..ca39491ccd1dc 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -136,4 +136,7 @@ public interface SubscriptionStats { long getFilterRescheduledMsgCount(); long getDelayedMessageIndexSizeInBytes(); + + /** The subscription isolationLevel as defined by {@link org.apache.pulsar.client.api.SubscriptionIsolationLevel}. */ + String getSubscriptionIsolationLevel(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index bed8aabf27d8d..10a7a722ae531 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -149,6 +149,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats { public long filterRescheduledMsgCount; + /** + * The subscription isolationLevel as defined by {@link org.apache.pulsar.client.api.SubscriptionIsolationLevel}. + */ + public String subscriptionIsolationLevel; + public SubscriptionStatsImpl() { this.consumers = new ArrayList<>(); this.consumersAfterMarkDeletePosition = new LinkedHashMap<>(); @@ -185,6 +190,7 @@ public void reset() { filterRejectedMsgCount = 0; filterRescheduledMsgCount = 0; bucketDelayedIndexStats.clear(); + subscriptionIsolationLevel = null; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current @@ -239,6 +245,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) { this.filterAcceptedMsgCount += stats.filterAcceptedMsgCount; this.filterRejectedMsgCount += stats.filterRejectedMsgCount; this.filterRescheduledMsgCount += stats.filterRescheduledMsgCount; + this.subscriptionIsolationLevel = stats.subscriptionIsolationLevel; stats.bucketDelayedIndexStats.forEach((k, v) -> { TopicMetricBean topicMetricBean = this.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new TopicMetricBean()); From 471297921b76312996abcbd4a9262d43bdc5b366 Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Tue, 24 Oct 2023 21:44:42 +0800 Subject: [PATCH 4/4] return 'topic.getManagedLedger().getLastConfirmedEntry()' when the subscription isolation level is ReadUnCommitted. --- .../persistent/PersistentDispatcherMultipleConsumers.java | 2 +- .../persistent/PersistentDispatcherSingleActiveConsumer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 38ef5feffbf33..d2207a31f990f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -358,7 +358,7 @@ public synchronized void readMoreEntries() { private PositionImpl getMaxReadPosition() { return subscription.getIsolationLevel() == IsolationLevel.READ_COMMITTED ? - topic.getMaxReadPosition() : PositionImpl.LATEST; + topic.getMaxReadPosition() : (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index e0511ae9219ff..2563e3f49210d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -369,7 +369,7 @@ protected void readMoreEntries(Consumer consumer) { private PositionImpl getMaxReadPosition() { return subscription.getIsolationLevel() == IsolationLevel.READ_COMMITTED ? - topic.getMaxReadPosition() : PositionImpl.LATEST; + topic.getMaxReadPosition() : (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); } @Override