Skip to content

Commit

Permalink
Add isolationLevel logic in broker side.
Browse files Browse the repository at this point in the history
  • Loading branch information
hzh0425 committed Sep 25, 2023
1 parent e13370d commit 05a5c46
Show file tree
Hide file tree
Showing 15 changed files with 227 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,37 @@
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.ToLongFunction;

public abstract class AbstractSubscription implements Subscription {
protected static final String SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY = "pulsar.subscription.isolation.level";
protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();

public static void wrapIsolationLevelToProperties(Map<String, String> properties, IsolationLevel isolationLevel) {
if (properties != null) {
properties.put(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY, String.valueOf(isolationLevel.getValue()));
}
}

public IsolationLevel fetchIsolationLevelFromProperties(Map<String, String> properties) {
if (properties == null) {
return IsolationLevel.READ_COMMITTED;
}

if (properties.containsKey(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)) {
IsolationLevel isolationLevel = IsolationLevel.valueOf(Integer.parseInt(properties.get(SUBSCRIPTION_ISOLATION_LEVEL_PROPERTY)));
return isolationLevel != null ? isolationLevel : IsolationLevel.READ_COMMITTED;
} else {
return IsolationLevel.READ_COMMITTED;
}
}

public long getMsgOutCounter() {
return msgOutFromRemovedConsumer.longValue() + sumConsumers(Consumer::getMsgOutCounter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
Expand Down Expand Up @@ -1145,6 +1146,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
: emptyKeySharedMeta;
final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
final IsolationLevel isolationLevel = subscribe.getIsolationLevel();
final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());

Expand Down Expand Up @@ -1251,6 +1253,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.schemaType(schema == null ? null : schema.getType())
.isolationLevel(isolationLevel)
.build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;

Expand Down Expand Up @@ -96,6 +97,8 @@ default long getNumberOfEntriesDelayed() {

String getTypeString();

IsolationLevel getIsolationLevel();

void addUnAckedMessages(int unAckMessages);

Map<String, String> getSubscriptionProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class SubscriptionOption {
private Map<String, String> metadata;
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private CommandSubscribe.IsolationLevel isolationLevel;
private long startMessageRollbackDurationSec;
private boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -170,7 +171,8 @@ CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName,
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta);
KeySharedMeta keySharedMeta,
IsolationLevel isolationLevel);

/**
* Subscribe a topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
Expand Down Expand Up @@ -70,6 +71,8 @@ public class NonPersistentSubscription extends AbstractSubscription implements S

private KeySharedMode keySharedMode = null;

private final IsolationLevel isolationLevel;

public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName,
Map<String, String> properties) {
this.topic = topic;
Expand All @@ -79,6 +82,7 @@ public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionNa
IS_FENCED_UPDATER.set(this, FALSE);
this.subscriptionProperties = properties != null
? Collections.unmodifiableMap(properties) : Collections.emptyMap();
this.isolationLevel = fetchIsolationLevelFromProperties(properties);
}

@Override
Expand Down Expand Up @@ -236,6 +240,11 @@ public String getTypeString() {
return "Null";
}

@Override
public IsolationLevel getIsolationLevel() {
return this.isolationLevel;
}

@Override
public CompletableFuture<Void> clearBacklog() {
// No-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand All @@ -69,6 +70,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -263,7 +265,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null),
option.getSchemaType());
option.getSchemaType(), option.getIsolationLevel());
}

@Override
Expand All @@ -273,10 +275,11 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long resetStartMessageBackInSec, boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta) {
KeySharedMeta keySharedMeta,
IsolationLevel isolationLevel) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
startMessageId, metadata, readCompacted, resetStartMessageBackInSec,
replicateSubscriptionState, keySharedMeta, null, null);
replicateSubscriptionState, keySharedMeta, null, null, isolationLevel);
}

private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
Expand All @@ -287,7 +290,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
SchemaType schemaType) {
SchemaType schemaType,
IsolationLevel isolationLevel) {

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
Expand Down Expand Up @@ -325,6 +329,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
lock.readLock().unlock();
}

AbstractSubscription.wrapIsolationLevelToProperties(subscriptionProperties, isolationLevel);
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName, subscriptionProperties));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
Expand Down Expand Up @@ -330,6 +331,7 @@ public synchronized void readMoreEntries() {
minReplayedPosition = null;
}

PositionImpl maxReadPosition = getMaxReadPosition();
// Filter out and skip read delayed messages exist in DelayedDeliveryTracker
if (delayedDeliveryTracker.isPresent()) {
Predicate<PositionImpl> skipCondition = null;
Expand All @@ -339,10 +341,10 @@ public synchronized void readMoreEntries() {
.containsMessage(position.getLedgerId(), position.getEntryId());
}
cursor.asyncReadEntriesWithSkipOrWait(messagesToRead, bytesToRead, this, ReadType.Normal,
topic.getMaxReadPosition(), skipCondition);
maxReadPosition, skipCondition);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal,
topic.getMaxReadPosition());
maxReadPosition);
}
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
Expand All @@ -354,6 +356,11 @@ public synchronized void readMoreEntries() {
}
}

private PositionImpl getMaxReadPosition() {
return subscription.getIsolationLevel() == IsolationLevel.READ_COMMITTED ?
topic.getMaxReadPosition() : PositionImpl.LATEST;
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
Expand Down Expand Up @@ -356,7 +357,7 @@ protected void readMoreEntries(Consumer consumer) {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, readEntriesCtx, topic.getMaxReadPosition());
bytesToRead, this, readEntriesCtx, getMaxReadPosition());
}
}
} else {
Expand All @@ -366,6 +367,11 @@ protected void readMoreEntries(Consumer consumer) {
}
}

private PositionImpl getMaxReadPosition() {
return subscription.getIsolationLevel() == IsolationLevel.READ_COMMITTED ?
topic.getMaxReadPosition() : PositionImpl.LATEST;
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private final IsolationLevel subscriptionIsolationLevel;
private volatile CompletableFuture<Void> fenceFuture;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
Expand Down Expand Up @@ -160,6 +162,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.pendingAckHandle = new PendingAckHandleDisabled();
}
IS_FENCED_UPDATER.set(this, FALSE);
this.subscriptionIsolationLevel = fetchIsolationLevelFromProperties(subscriptionProperties);
}

public void updateLastMarkDeleteAdvancedTimestamp() {
Expand Down Expand Up @@ -506,6 +509,11 @@ public String getTypeString() {
return "Null";
}

@Override
public IsolationLevel getIsolationLevel() {
return subscriptionIsolationLevel;
}

@Override
public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -133,6 +134,7 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.IsolationLevel;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -797,7 +799,8 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
option.getConsumerEpoch(), option.getSchemaType());
option.getConsumerEpoch(), option.getSchemaType(),
option.getIsolationLevel());
}

private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
Expand All @@ -811,7 +814,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
long consumerEpoch,
SchemaType schemaType) {
SchemaType schemaType,
IsolationLevel isolationLevel) {
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
return FutureUtil.failedFuture(new NotAllowedException(
"readCompacted only allowed on failover or exclusive subscriptions"));
Expand Down Expand Up @@ -890,6 +894,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
lock.readLock().unlock();
}


AbstractSubscription.wrapIsolationLevelToProperties(subscriptionProperties, isolationLevel);
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
Expand Down Expand Up @@ -975,10 +981,11 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicatedSubscriptionStateArg,
KeySharedMeta keySharedMeta) {
KeySharedMeta keySharedMeta,
IsolationLevel isolationLevel) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null);
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, isolationLevel);
}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
Expand Down
Loading

0 comments on commit 05a5c46

Please sign in to comment.