diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index a96a7e75506eb..fd00711d05364 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -118,7 +118,7 @@ public class PersistentSubscription extends AbstractSubscription { // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; - protected static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription"; + public static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription"; // Map of properties that is used to mark this subscription as "replicated". // Since this is the only field at this point, we can just keep a static diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 08a481b7051c6..e444b976bc18e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1115,10 +1115,6 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { return; } } - if (replicated != null && replicated && !subscription.isReplicated()) { - // Flip the subscription state - subscription.setReplicated(replicated); - } if (startMessageRollbackDurationSec > 0) { resetSubscriptionCursor(subscription, subscriptionFuture, startMessageRollbackDurationSec); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java index 327081bf1b9c8..0d9a4b5164496 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.broker.service.persistent.PersistentSubscription.REPLICATED_SUBSCRIPTION_PROPERTY; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -29,6 +30,7 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -93,4 +95,43 @@ public void testReplicateSubscriptionState(Boolean replicateSubscriptionState) return true; }); } + + @Test(dataProvider = "replicateSubscriptionState") + public void testExistingSubscriptionWithReplicateSubscriptionState(Boolean replicateSubscriptionState) + throws Exception { + String subName = "my-sub-" + System.nanoTime(); + String topic = "persistent://my-property/my-ns/" + System.nanoTime(); + + ConsumerBuilder consumer1Builder = pulsarClient.newConsumer().topic(topic).subscriptionName(subName); + if (replicateSubscriptionState != null) { + consumer1Builder.replicateSubscriptionState(replicateSubscriptionState); + } + @Cleanup + Consumer consumer1 = consumer1Builder.subscribe(); + assertReplicatedSubscriptionStatus(topic, subName, replicateSubscriptionState); + consumer1.close(); + + admin.topics().unload(topic); + + ConsumerBuilder consumer2Builder = pulsarClient.newConsumer().topic(topic).subscriptionName(subName); + if (replicateSubscriptionState != null) { + // Reverse + consumer2Builder.replicateSubscriptionState(!replicateSubscriptionState); + } + @Cleanup + Consumer consumer2 = consumer2Builder.subscribe(); + assertReplicatedSubscriptionStatus(topic, subName, replicateSubscriptionState); + consumer2.close(); + } + + private void assertReplicatedSubscriptionStatus(String topic, String subName, Boolean expected) + throws PulsarAdminException { + assertThat(admin.topics().getInternalStats(topic).cursors.get(subName)).isNotNull().matches(n -> { + Long property = n.properties.get(REPLICATED_SUBSCRIPTION_PROPERTY); + assertThat(property).isEqualTo(expected == null || !expected ? null : 1L); + return true; + }); + assertThat(admin.topics().getReplicatedSubscriptionStatus(topic, subName)).containsEntry(topic, + expected != null && expected); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index ed77652c82340..ab2f38366aa16 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -431,8 +431,12 @@ public interface ConsumerBuilder extends Cloneable { ConsumerBuilder maxAcknowledgmentGroupSize(int messageNum); /** + * Configures initial replicated subscription state for a new subscription. + * This setting does not affect existing subscription. Default is `null`. * - * @param replicateSubscriptionState + * @param replicateSubscriptionState If true, the subscription state will be replicated + * across GEO-replicated clusters. If false, replication + * is disabled. */ ConsumerBuilder replicateSubscriptionState(boolean replicateSubscriptionState);