diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9a08578ee4088..31fa1c8488d77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3338,7 +3338,7 @@ public void checkUnAckMessageDispatching() { } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < maxUnackedMessages / 2) { // unblock broker-dispatching if received enough acked messages back if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) { - unblockDispatchersOnUnAckMessages(blockedDispatchers.values()); + unblockDispatchersOnUnAckMessages(blockedDispatchers.values(), true); } } @@ -3376,13 +3376,17 @@ private void blockDispatchersWithLargeUnAckMessages() { * Unblocks the dispatchers and removes it from the {@link #blockedDispatchers} list. * * @param dispatcherList + * @param allowReadMore */ - public void unblockDispatchersOnUnAckMessages(List dispatcherList) { + public void unblockDispatchersOnUnAckMessages(List dispatcherList, + boolean allowReadMore) { lock.writeLock().lock(); try { dispatcherList.forEach(dispatcher -> { dispatcher.unBlockDispatcherOnUnackedMsgs(); - executor().execute(() -> dispatcher.readMoreEntries()); + if (allowReadMore) { + executor().execute(() -> dispatcher.readMoreEntries()); + } log.info("[{}] Dispatcher is unblocked", dispatcher.getName()); blockedDispatchers.remove(dispatcher); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 19711bfa718f4..748cef1e220e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -390,7 +390,7 @@ public Future sendMessages(final List entries, + " for consumerId: {}; avgMessagesPerEntry is {}", topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get()); } - incrementUnackedMessages(unackedMessages); + addAndGetUnAckedMsgs(this, unackedMessages); Future writeAndFlushPromise = cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch); @@ -412,14 +412,6 @@ public Future sendMessages(final List entries, return writeAndFlushPromise; } - private void incrementUnackedMessages(int unackedMessages) { - if (Subscription.isIndividualAckMode(subType) - && addAndGetUnAckedMsgs(this, unackedMessages) >= getMaxUnackedMessages() - && getMaxUnackedMessages() > 0) { - blockedConsumerOnUnackedMsgs = true; - } - } - public boolean isWritable() { return cnx.isWritable(); } @@ -793,10 +785,6 @@ public void flowPermits(int additionalNumberOfMessages) { checkArgument(additionalNumberOfMessages > 0); this.lastConsumedFlowTimestamp = System.currentTimeMillis(); - // block shared consumer when unacked-messages reaches limit - if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) { - blockedConsumerOnUnackedMsgs = true; - } int oldPermits; if (!blockedConsumerOnUnackedMsgs) { oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages); @@ -879,16 +867,6 @@ public boolean checkAndApplyTopicMigration() { } return false; } - /** - * Checks if consumer-blocking on unAckedMessages is allowed for below conditions:
- * a. consumer must have Shared-subscription
- * b. {@link this#getMaxUnackedMessages()} value > 0 - * - * @return - */ - private boolean shouldBlockConsumerOnUnackMsgs() { - return Subscription.isIndividualAckMode(subType) && getMaxUnackedMessages() > 0; - } public void updateRates() { msgOut.calculateRate(); @@ -1044,15 +1022,6 @@ private boolean removePendingAcks(PositionImpl position) { if (log.isDebugEnabled()) { log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position); } - // unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages => - // consumer can start again consuming messages - int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer); - if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs) - && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) - || !shouldBlockConsumerOnUnackMsgs()) { - ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false; - flowConsumerBlockedPermits(ackOwnedConsumer); - } return true; } return false; @@ -1068,7 +1037,7 @@ public int getPriorityLevel() { public void redeliverUnacknowledgedMessages(long consumerEpoch) { // cleanup unackedMessage bucket and redeliver those unack-msgs again - clearUnAckedMsgs(); + UNACKED_MESSAGES_UPDATER.set(this, 0); blockedConsumerOnUnackedMsgs = false; if (log.isDebugEnabled()) { log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId); @@ -1143,10 +1112,24 @@ public Subscription getSubscription() { } private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { - int unackedMsgs = 0; - if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) { - subscription.addUnAckedMessages(ackedMessages); - unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); + if (!isPersistentTopic || !Subscription.isIndividualAckMode(subType)) { + return 0; + } + subscription.addUnAckedMessages(ackedMessages); + int unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); + int maxUnackedMessages = getMaxUnackedMessages(); + if (maxUnackedMessages > 0) { + if (ackedMessages < 0) { + if (unackedMsgs <= maxUnackedMessages / 2 && blockedConsumerOnUnackedMsgs) { + blockedConsumerOnUnackedMsgs = false; + flowConsumerBlockedPermits(this); + } + } else if (ackedMessages > 0) { + // block shared consumer when unacked-messages reaches limit + if (unackedMsgs >= getMaxUnackedMessages()) { + blockedConsumerOnUnackedMsgs = true; + } + } } if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) { negativeUnackedMsgsTimestamp = System.currentTimeMillis(); @@ -1155,11 +1138,6 @@ private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { return unackedMsgs; } - private void clearUnAckedMsgs() { - int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0); - subscription.addUnAckedMessages(-unaAckedMsgs); - } - public boolean isPreciseDispatcherFlowControl() { return preciseDispatcherFlowControl; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index f20750fa0c20d..c318816d20236 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; -import com.google.common.collect.Lists; import com.google.common.collect.Range; import java.util.ArrayList; import java.util.Collections; @@ -1005,6 +1004,7 @@ public boolean isConsumerAvailable(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { + doAddUnAckedMessages(-totalUnackedMessages, false); consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) { redeliveryTracker.incrementAndGetRedeliveryCount((PositionImpl.get(ledgerId, entryId))); @@ -1019,6 +1019,7 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, long @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { + doAddUnAckedMessages(-totalUnackedMessages, false); positions.forEach(position -> { // TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages // on Key_Shared subscription, but it's difficult to get the sticky key here @@ -1034,12 +1035,18 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List @Override public void addUnAckedMessages(int numberOfMessages) { + doAddUnAckedMessages(numberOfMessages, true); + } + + private void doAddUnAckedMessages(int numberOfMessages, boolean allowReadMore) { int maxUnackedMessages = topic.getMaxUnackedMessagesOnSubscription(); // don't block dispatching if maxUnackedMessages = 0 if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE && BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name); - readMoreEntriesAsync(); + if (allowReadMore) { + readMoreEntriesAsync(); + } } int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages); @@ -1055,14 +1062,16 @@ public void addUnAckedMessages(int numberOfMessages) { if (totalUnackedMessages < (topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2)) { if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { // it removes dispatcher from blocked list and unblocks dispatcher by scheduling read - topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList(this)); + topic.getBrokerService().unblockDispatchersOnUnAckMessages(List.of(this), allowReadMore); } } - } else if (blockedDispatcherOnUnackedMsgs == TRUE && unAckedMessages < maxUnackedMessages / 2) { + } else if (blockedDispatcherOnUnackedMsgs == TRUE && unAckedMessages <= maxUnackedMessages / 2) { // unblock dispatcher if it acks back enough messages if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) { log.debug("[{}] Dispatcher is unblocked", name); - readMoreEntriesAsync(); + if (allowReadMore) { + readMoreEntriesAsync(); + } } } // increment broker-level count diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 172842b5ed3bf..41fcece4bc03f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -93,6 +93,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; @@ -104,6 +105,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -115,6 +117,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -133,6 +136,7 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -1900,5 +1904,261 @@ public void close() { Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), NullLedgerOffloader.INSTANCE); } } + + @DataProvider(name = "max_unacked_data") + public static Object[][] max_unacked_data() { + return new Object[][] {{1, 1}, {1, 2}, {1, 5}, {2, 2}, {2, 3}, {4, 4}, {4, 8}, {10, 20}}; + } + + @Test(dataProvider = "max_unacked_data") + public void testConsumerDoesntGetBlocked(int maxUnackedMsgPerConsumer, int maxUnackedMsgPerSubscription) + throws Exception { + doTestThatConsumerDoesntGetBlocked("testConsumerDoesntGetBlocked", + maxUnackedMsgPerConsumer, maxUnackedMsgPerSubscription, consumerBuilder -> { + consumerBuilder.isAckReceiptEnabled(true) + .receiverQueueSize(0); + }); + } + + @Test(dataProvider = "max_unacked_data") + public void testConsumerDoesntGetBlockedWhenReceiverQueueSizeIsSet(int maxUnackedMessagesPerConsumer, + int maxUnackedMessagesPerSubscription) + throws Exception { + doTestThatConsumerDoesntGetBlocked("testConsumerDoesntGetBlockedWhenReceiverQueueSize", + maxUnackedMessagesPerConsumer, maxUnackedMessagesPerSubscription, consumerBuilder -> { + consumerBuilder.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .receiverQueueSize(1); + }); + } + + private void doTestThatConsumerDoesntGetBlocked(String testName, int maxUnackedMessagesPerConsumer, + int maxUnackedMessagesPerSubscription, + java.util.function.Consumer> consumerBuilderCustomizer) + throws PulsarAdminException, PulsarClientException, InterruptedException { + final String ns = + "prop/%s-%d-%d".formatted(testName, maxUnackedMessagesPerConsumer, + maxUnackedMessagesPerSubscription); + + admin.namespaces().createNamespace(ns, 2); + admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, maxUnackedMessagesPerConsumer); + admin.namespaces().setMaxUnackedMessagesPerSubscription(ns, maxUnackedMessagesPerSubscription); + + final String topicName = "persistent://" + ns + "/test-topic"; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + String subscriptionName = "sub1"; + + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared); + consumerBuilderCustomizer.accept(consumerBuilder); + @Cleanup + Consumer consumer = consumerBuilder.subscribe(); + + int numberOfMessages = maxUnackedMessagesPerSubscription * 5; + + // Supervisor whether all the work has been done. + CountDownLatch supervisor = new CountDownLatch(numberOfMessages); + + // producer + for (int i = 0; i < numberOfMessages; i++) { + producer.send((("message " + i).getBytes(StandardCharsets.UTF_8))); + } + + // consumer + @Cleanup("interrupt") + Thread consumerThread = new Thread(() -> { + try { + while (!Thread.currentThread().isInterrupted()) { + Message message = consumer.receive(); + if (message == null) { + break; + } + consumer.acknowledge(message); + supervisor.countDown(); + } + } catch (PulsarClientException e) { + log.error("Consumer failed", e); + throw new RuntimeException(e); + } + }); + consumerThread.start(); + + + // Wait until all messages are sent and later processed. + if (!supervisor.await(10, TimeUnit.SECONDS)) { + fail("The test is failed because of latch.await timeout."); + } + + Awaitility.await() + .untilAsserted(() -> { + SubscriptionStats subscriptionStats = + admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName); + ConsumerStats consumerStats = subscriptionStats.getConsumers().get(0); + + int currentReceiverQueueSize = ((ConsumerImpl) consumer).getCurrentReceiverQueueSize(); + int numMessagesInQueue = ((ConsumerImpl) consumer).numMessagesInQueue(); + + long subscriptionUnackedMessages = subscriptionStats.getUnackedMessages(); + int consumerUnackedMessages = consumerStats.getUnackedMessages(); + + boolean blockedSubscriptionOnUnackedMsgs = subscriptionStats.isBlockedSubscriptionOnUnackedMsgs(); + boolean blockedConsumerOnUnackedMsgs = consumerStats.isBlockedConsumerOnUnackedMsgs(); + + log.info("----"); + log.info("subscriptionStats: {}", subscriptionStats); + log.info("currentReceiverQueueSize: {}", currentReceiverQueueSize); + log.info("numMessagesInQueue: {}", numMessagesInQueue); + log.info("subscriptionUnackedMessages: {}", subscriptionUnackedMessages); + log.info("consumerUnackedMessages: {}", consumerUnackedMessages); + log.info("blockedSubscriptionOnUnackedMsgs: {}", blockedSubscriptionOnUnackedMsgs); + log.info("blockedConsumerOnUnackedMsgs: {}", blockedConsumerOnUnackedMsgs); + log.info("----"); + + assertEquals(numMessagesInQueue, 0, "numMessagesInQueue"); + assertEquals(subscriptionUnackedMessages, 0, "subscriptionUnackedMessages"); + assertEquals(consumerUnackedMessages, 0, "consumerUnackedMessages"); + assertFalse(blockedSubscriptionOnUnackedMsgs, "blockedSubscriptionOnUnackedMsgs"); + assertFalse(blockedConsumerOnUnackedMsgs, "blockedConsumerOnUnackedMsgs"); + }); + } + + @Test(dataProvider = "max_unacked_data") + public void testConsumerDoesntGetBlockedMultipleConsumers(int maxUnackedMessagesPerConsumer, + int maxUnackedMessagesPerSubscription) + throws Exception { + final String ns = + "prop/ns-test-%d-%d".formatted(maxUnackedMessagesPerConsumer, maxUnackedMessagesPerSubscription); + + admin.namespaces().createNamespace(ns, 2); + admin.namespaces().setMaxUnackedMessagesPerConsumer(ns, maxUnackedMessagesPerConsumer); + admin.namespaces().setMaxUnackedMessagesPerSubscription(ns, maxUnackedMessagesPerSubscription); + + final String topicName = "persistent://" + ns + "/test-22657-3"; + + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + String subscriptionName = "sub1"; + + // numbersOfConsumers = each consumer can have their own max unack + int numbersOfConsumers = maxUnackedMessagesPerSubscription / maxUnackedMessagesPerConsumer; + + List> consumers = new ArrayList<>(); + for (int i = 0; i < numbersOfConsumers; i++) { + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .receiverQueueSize(1) + .subscribe(); + consumers.add(consumer); + } + + int numberOfMessages = maxUnackedMessagesPerSubscription * 5; + CountDownLatch supervisor = new CountDownLatch(numberOfMessages * 2); + + // producer + for (int i = 0; i < numberOfMessages; i++) { + producer.sendAsync((("message " + i).getBytes(StandardCharsets.UTF_8))) + .whenCompleteAsync((messageId, throwable) -> { + if (throwable == null) { + supervisor.countDown(); + } + } + ); + } + + List consumerThreads = new ArrayList<>(); + + // consumers + for (int i = 0; i < consumers.size(); i++) { + int finalI = i; + Consumer consumer = consumers.get(i); + for (int k = 0; k < maxUnackedMessagesPerConsumer; k++) { + Thread consumerThread = new Thread(() -> { + try { + while (!Thread.currentThread().isInterrupted()) { + Message message = consumer.receive(1, TimeUnit.SECONDS); + if (message == null) { + break; + } + + consumer.acknowledge(message); + supervisor.countDown(); + } + } catch (PulsarClientException e) { + log.error("Consumer {} failed", finalI, e); + throw new RuntimeException(e); + } + }); + consumerThread.start(); + consumerThreads.add(consumerThread); + } + } + + @Cleanup + AutoCloseable consumerThreadCleaner = () -> { + for (Thread consumerThread : consumerThreads) { + consumerThread.interrupt(); + consumerThread.join(); + } + }; + + // Wait until all messages are sent and later processed. + if (!supervisor.await(10, TimeUnit.SECONDS)) { + fail("The test is failed because of latch.await timeout."); + } + + Awaitility.await() + .atMost(1, TimeUnit.SECONDS) + .pollDelay(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + SubscriptionStats subscriptionStats = + admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName); + ConsumerStats consumerStats = subscriptionStats.getConsumers().get(0); + + StringBuilder currentReceiverQueueSize = new StringBuilder(); + StringBuilder numMessagesInQueue = new StringBuilder(); + for (Consumer consumer : consumers) { + currentReceiverQueueSize.append(((ConsumerImpl) consumer).getCurrentReceiverQueueSize()) + .append("/"); + numMessagesInQueue.append(((ConsumerImpl) consumer).numMessagesInQueue()).append("/"); + } + currentReceiverQueueSize.deleteCharAt(currentReceiverQueueSize.length() - 1); + numMessagesInQueue.deleteCharAt(numMessagesInQueue.length() - 1); + + long subscriptionUnackedMessages = subscriptionStats.getUnackedMessages(); + int consumerUnackedMessages = consumerStats.getUnackedMessages(); + + boolean blockedSubscriptionOnUnackedMsgs = subscriptionStats.isBlockedSubscriptionOnUnackedMsgs(); + boolean blockedConsumerOnUnackedMsgs = consumerStats.isBlockedConsumerOnUnackedMsgs(); + + log.info("----"); + log.info("subscriptionStats: {}", subscriptionStats); + log.info("currentReceiverQueueSize: {}", currentReceiverQueueSize); + log.info("numMessagesInQueue: {}", numMessagesInQueue); + log.info("subscriptionUnackedMessages: {}", subscriptionUnackedMessages); + log.info("consumerUnackedMessages: {}", consumerUnackedMessages); + log.info("blockedSubscriptionOnUnackedMsgs: {}", blockedSubscriptionOnUnackedMsgs); + log.info("blockedConsumerOnUnackedMsgs: {}", blockedConsumerOnUnackedMsgs); + log.info("----"); + + assertEquals(subscriptionUnackedMessages, 0, "subscriptionUnackedMessages"); + assertEquals(consumerUnackedMessages, 0, "consumerUnackedMessages"); + assertFalse(blockedSubscriptionOnUnackedMsgs, "blockedSubscriptionOnUnackedMsgs"); + assertFalse(blockedConsumerOnUnackedMsgs, "blockedConsumerOnUnackedMsgs"); + }); + } + }