Skip to content

Commit

Permalink
[fix][broker] Fix compaction subscription delete by inactive subscrip…
Browse files Browse the repository at this point in the history
…tion check. (apache#20983)

(cherry picked from commit 43cd86d)
  • Loading branch information
lifepuzzlefun authored and shibd committed Oct 22, 2023
1 parent 66ecea4 commit 992987b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2374,7 +2374,9 @@ public void checkInactiveSubscriptions() {
.toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()
|| sub.isReplicated()
|| isCompactionSubscription(subName)) {
return;
}
if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.http.HttpResponse;
Expand All @@ -76,6 +77,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.client.admin.BrokerStats;
Expand Down Expand Up @@ -106,6 +108,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -1222,6 +1225,69 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex
}
}

@Test
public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() throws Exception {
String namespace = "prop/test";

// set up broker set compaction threshold.
cleanup();
conf.setBrokerServiceCompactionThresholdInBytes(8);
setup();

try {
admin.namespaces().createNamespace(namespace);
} catch (PulsarAdminException.ConflictException e) {
// Ok.. (if test fails intermittently and namespace is already created)
}

// set enable subscription expiration.
admin.namespaces().setSubscriptionExpirationTime(namespace, 1);

String compactionInactiveTestTopic = "persistent://prop/test/testCompactionCursorShouldNotDelete";

admin.topics().createNonPartitionedTopic(compactionInactiveTestTopic);

CompletableFuture<Optional<Topic>> topicCf =
pulsar.getBrokerService().getTopic(compactionInactiveTestTopic, true);

Optional<Topic> topicOptional = topicCf.get();
assertTrue(topicOptional.isPresent());

PersistentTopic topic = (PersistentTopic) topicOptional.get();

PersistentSubscription sub = (PersistentSubscription) topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION);
assertNotNull(sub);

topic.checkCompaction();

Field currentCompaction = PersistentTopic.class.getDeclaredField("currentCompaction");
currentCompaction.setAccessible(true);
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)currentCompaction.get(topic);

compactionFuture.get();

ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();

// make cursor last active time to very small to check if it will be deleted
Field cursorLastActiveField = ManagedCursorImpl.class.getDeclaredField("lastActive");
cursorLastActiveField.setAccessible(true);
cursorLastActiveField.set(cursor, 0);

// replace origin object. so we can check if subscription is deleted.
PersistentSubscription spySubscription = Mockito.spy(sub);
topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, spySubscription);

// trigger inactive check.
topic.checkInactiveSubscriptions();

// Compaction subscription should not call delete method.
Mockito.verify(spySubscription, Mockito.never()).delete();

// check if the subscription exist.
assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));

}

/**
* Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.
Expand Down

0 comments on commit 992987b

Please sign in to comment.