Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][txn][PIP-298] Consumer supports specifying consumption isolation level #21246

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,37 @@
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.ToLongFunction;

public abstract class AbstractSubscription implements Subscription {
protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = "pulsar.subscription.isolation.level";
Copy link
Contributor

@poorbarcode poorbarcode Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is a prop for Transaction, we should make this prop name contain the keyword transaction, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for all the system properties, please add __ as a prefix.

protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();

public static void wrapIsolationLevelToProperties(Map<String, String> properties, IsolationLevel isolationLevel) {
if (properties != null) {
Copy link
Contributor

@poorbarcode poorbarcode Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the argument properties is null, you discarded the prop isolationLevel, it is wrong. please add a test for this case, thanks

properties.put(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY, String.valueOf(isolationLevel.getValue()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the subscription was created, the prop SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY can not be modified, right?

If yes, we should prevent modifying it by the API pulsar-admin topics update-subscription-properties. And please add a test for this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, why do we need to persist the isolation level to properties?
It should follow the subscription type. The subscription will use the isolation type that the first consumer provided.
After all consumer disconnected, a new consumer will get a chance to change the isolation level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui

Sorry, why do we need to persist the isolation level to properties?
It should follow the subscription type. The subscription will use the isolation type that the first consumer provided.
After all consumer disconnected, a new consumer will get a chance to change the isolation level.

For example:

  • there are 10 messages
    • 3:0~3:4(committed)
    • 3:5~3:9(transaction on-going)]
  • create a subscription with reading un-committed isolation level
    • the consumer committed 3:0~3~6
  • close all consumers
  • create a new consumer on the old subscription with read committed isolation level, (Highlight) the read position and marked deleted position will go to a strange value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should follow the subscription type. The subscription will use the isolation type that the first consumer provided.
After all consumer disconnected, a new consumer will get a chance to change the isolation level.

I agree that the subscription use the isolation type of the first consumer provided. But the isolation level should not be changed. Just like schema.

}
}

public IsolationLevel fetchIsolationLevelFromProperties(Map<String, String> properties) {
if (properties == null) {
return IsolationLevel.READ_COMMITTED;
}

if (properties.containsKey(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)) {
IsolationLevel isolationLevel = IsolationLevel.valueOf(Integer.parseInt(properties.get(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we catch the NumberFormatException here? And please add a test for this case.

return isolationLevel != null ? isolationLevel : IsolationLevel.READ_COMMITTED;
} else {
return IsolationLevel.READ_COMMITTED;
}
}

public long getMsgOutCounter() {
return msgOutFromRemovedConsumer.longValue() + sumConsumers(Consumer::getMsgOutCounter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
Expand Down Expand Up @@ -1145,6 +1146,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
: emptyKeySharedMeta;
final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
final IsolationLevel isolationLevel = subscribe.getIsolationLevel();
final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());

Expand Down Expand Up @@ -1251,6 +1253,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.schemaType(schema == null ? null : schema.getType())
.isolationLevel(isolationLevel)
.build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;

Expand Down Expand Up @@ -96,6 +97,8 @@ default long getNumberOfEntriesDelayed() {

String getTypeString();

IsolationLevel getIsolationLevel();

void addUnAckedMessages(int unAckMessages);

Map<String, String> getSubscriptionProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class SubscriptionOption {
private Map<String, String> metadata;
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private CommandSubscribe.IsolationLevel isolationLevel;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you modify this variable with final?

private long startMessageRollbackDurationSec;
private boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -170,7 +171,8 @@ CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName,
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta);
KeySharedMeta keySharedMeta,
IsolationLevel isolationLevel);

/**
* Subscribe a topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
Expand Down Expand Up @@ -70,6 +71,8 @@ public class NonPersistentSubscription extends AbstractSubscription implements S

private KeySharedMode keySharedMode = null;

private final IsolationLevel isolationLevel;

public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName,
Map<String, String> properties) {
this.topic = topic;
Expand All @@ -79,6 +82,7 @@ public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionNa
IS_FENCED_UPDATER.set(this, FALSE);
this.subscriptionProperties = properties != null
? Collections.unmodifiableMap(properties) : Collections.emptyMap();
this.isolationLevel = fetchIsolationLevelFromProperties(properties);
}

@Override
Expand Down Expand Up @@ -236,6 +240,11 @@ public String getTypeString() {
return "Null";
}

@Override
public IsolationLevel getIsolationLevel() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use @lombok.Getter instead of this method is better?

return this.isolationLevel;
}

@Override
public CompletableFuture<Void> clearBacklog() {
// No-op
Expand Down Expand Up @@ -463,6 +472,7 @@ public NonPersistentSubscriptionStatsImpl getStats() {

subStats.type = getTypeString();
subStats.msgDropRate = dispatcher.getMessageDropRate().getValueRate();
subStats.subscriptionIsolationLevel = this.isolationLevel.toString();

KeySharedMode keySharedMode = this.keySharedMode;
if (getType() == SubType.Key_Shared && keySharedMode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand All @@ -69,6 +70,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -263,7 +265,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null),
option.getSchemaType());
option.getSchemaType(), option.getIsolationLevel());
}

@Override
Expand All @@ -273,10 +275,11 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long resetStartMessageBackInSec, boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta) {
KeySharedMeta keySharedMeta,
IsolationLevel isolationLevel) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
startMessageId, metadata, readCompacted, resetStartMessageBackInSec,
replicateSubscriptionState, keySharedMeta, null, null);
replicateSubscriptionState, keySharedMeta, null, null, isolationLevel);
}

private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
Expand All @@ -287,7 +290,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
SchemaType schemaType) {
SchemaType schemaType,
IsolationLevel isolationLevel) {

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
Expand Down Expand Up @@ -325,6 +329,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
lock.readLock().unlock();
}

AbstractSubscription.wrapIsolationLevelToProperties(subscriptionProperties, isolationLevel);
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName, subscriptionProperties));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
Expand Down Expand Up @@ -330,6 +331,7 @@ public synchronized void readMoreEntries() {
minReplayedPosition = null;
}

PositionImpl maxReadPosition = getMaxReadPosition();
// Filter out and skip read delayed messages exist in DelayedDeliveryTracker
if (delayedDeliveryTracker.isPresent()) {
Predicate<PositionImpl> skipCondition = null;
Expand All @@ -339,10 +341,10 @@ public synchronized void readMoreEntries() {
.containsMessage(position.getLedgerId(), position.getEntryId());
}
cursor.asyncReadEntriesWithSkipOrWait(messagesToRead, bytesToRead, this, ReadType.Normal,
topic.getMaxReadPosition(), skipCondition);
maxReadPosition, skipCondition);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal,
topic.getMaxReadPosition());
maxReadPosition);
}
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
Expand All @@ -354,6 +356,11 @@ public synchronized void readMoreEntries() {
}
}

private PositionImpl getMaxReadPosition() {
return subscription.getIsolationLevel() == IsolationLevel.READ_COMMITTED ?
topic.getMaxReadPosition() : PositionImpl.LATEST;
hzh0425 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
Expand Down Expand Up @@ -356,7 +357,7 @@ protected void readMoreEntries(Consumer consumer) {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
bytesToRead, this, readEntriesCtx, getMaxReadPosition());
}
}
} else {
Expand All @@ -366,6 +367,11 @@ protected void readMoreEntries(Consumer consumer) {
}
}

private PositionImpl getMaxReadPosition() {
return subscription.getIsolationLevel() == IsolationLevel.READ_COMMITTED ?
topic.getMaxReadPosition() : PositionImpl.LATEST;
hzh0425 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private final IsolationLevel isolationLevel;
private volatile CompletableFuture<Void> fenceFuture;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
Expand Down Expand Up @@ -160,6 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.pendingAckHandle = new PendingAckHandleDisabled();
}
IS_FENCED_UPDATER.set(this, FALSE);
this.isolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support users using a message that is read from a subscription typed READ_UNCOMMITTED for a new transaction?

  • If false, we should init the component pendingAckHandle typed PendingAckHandleDisabled when the isolationLevel is READ_UNCOMMITTED, right?
  • If yes, how can we skip the recover task for this pending ack?

And please add a test for this case.

}

public void updateLastMarkDeleteAdvancedTimestamp() {
Expand Down Expand Up @@ -506,6 +509,11 @@ public String getTypeString() {
return "Null";
}

@Override
public IsolationLevel getIsolationLevel() {
return isolationLevel;
}

@Override
public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {

Expand Down Expand Up @@ -1187,6 +1195,7 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.isReplicated = isReplicated();
subStats.subscriptionProperties = subscriptionProperties;
subStats.isDurable = cursor.isDurable();
subStats.subscriptionIsolationLevel = this.isolationLevel.toString();
if (getType() == SubType.Key_Shared && dispatcher instanceof PersistentStickyKeyDispatcherMultipleConsumers) {
PersistentStickyKeyDispatcherMultipleConsumers keySharedDispatcher =
(PersistentStickyKeyDispatcherMultipleConsumers) dispatcher;
Expand Down
Loading
Loading