diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6f3a704c4f378..4a9d0069c9e0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2374,7 +2374,9 @@ public void checkInactiveSubscriptions() { .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); if (expirationTimeMillis > 0) { subscriptions.forEach((subName, sub) -> { - if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) { + if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() + || sub.isReplicated() + || isCompactionSubscription(subName)) { return; } if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index d4ae0da3617e3..b91e9c1aff994 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -67,6 +67,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.http.HttpResponse; @@ -76,6 +77,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.client.admin.BrokerStats; @@ -106,6 +108,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.apache.pulsar.compaction.Compactor; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -1222,6 +1225,69 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex } } + @Test + public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() throws Exception { + String namespace = "prop/test"; + + // set up broker set compaction threshold. + cleanup(); + conf.setBrokerServiceCompactionThresholdInBytes(8); + setup(); + + try { + admin.namespaces().createNamespace(namespace); + } catch (PulsarAdminException.ConflictException e) { + // Ok.. (if test fails intermittently and namespace is already created) + } + + // set enable subscription expiration. + admin.namespaces().setSubscriptionExpirationTime(namespace, 1); + + String compactionInactiveTestTopic = "persistent://prop/test/testCompactionCursorShouldNotDelete"; + + admin.topics().createNonPartitionedTopic(compactionInactiveTestTopic); + + CompletableFuture> topicCf = + pulsar.getBrokerService().getTopic(compactionInactiveTestTopic, true); + + Optional topicOptional = topicCf.get(); + assertTrue(topicOptional.isPresent()); + + PersistentTopic topic = (PersistentTopic) topicOptional.get(); + + PersistentSubscription sub = (PersistentSubscription) topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION); + assertNotNull(sub); + + topic.checkCompaction(); + + Field currentCompaction = PersistentTopic.class.getDeclaredField("currentCompaction"); + currentCompaction.setAccessible(true); + CompletableFuture compactionFuture = (CompletableFuture)currentCompaction.get(topic); + + compactionFuture.get(); + + ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor(); + + // make cursor last active time to very small to check if it will be deleted + Field cursorLastActiveField = ManagedCursorImpl.class.getDeclaredField("lastActive"); + cursorLastActiveField.setAccessible(true); + cursorLastActiveField.set(cursor, 0); + + // replace origin object. so we can check if subscription is deleted. + PersistentSubscription spySubscription = Mockito.spy(sub); + topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, spySubscription); + + // trigger inactive check. + topic.checkInactiveSubscriptions(); + + // Compaction subscription should not call delete method. + Mockito.verify(spySubscription, Mockito.never()).delete(); + + // check if the subscription exist. + assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION)); + + } + /** * Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and * it should not introduce deadlock while performing it.