diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index cd5acd069e747..23c4cdd84c2d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -406,15 +406,12 @@ protected void reScheduleReadInMs(long readAfterMs) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs); } - Runnable runnable = () -> { - isRescheduleReadInProgress.set(false); - readMoreEntries(); - }; - if (readAfterMs > 0) { - topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS); - } else { - topic.getBrokerService().executor().execute(runnable); - } + topic.getBrokerService().executor().schedule( + () -> { + isRescheduleReadInProgress.set(false); + readMoreEntries(); + }, + readAfterMs, TimeUnit.MILLISECONDS); } } @@ -798,7 +795,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis totalBytesSent += sendMessageInfo.getTotalBytes(); } - lastNumberOfEntriesDispatched = (int) totalEntries; acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); if (entriesToDispatch > 0) { @@ -811,8 +807,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); - } + lastNumberOfEntriesDispatched = entriesToDispatch; + } return true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index f7326734eaada..f6431b38fb6fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -98,8 +98,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { final String topicName = "persistent://public/default/testTopic"; final String subscriptionName = "testSubscription"; private AtomicInteger consumerMockAvailablePermits; - int retryBackoffInitialTimeInMs = 10; - int retryBackoffMaxTimeInMs = 50; @BeforeMethod public void setup() throws Exception { @@ -109,8 +107,8 @@ public void setup() throws Exception { doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); - doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); + doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); + doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -461,45 +459,34 @@ public void testMessageRedelivery() throws Exception { allEntries.forEach(entry -> entry.release()); } - @DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched") - private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() { - return new Object[][] { { false, true }, { true, true }, { true, false }, { false, false } }; + @DataProvider(name = "dispatchMessagesInSubscriptionThread") + private Object[][] dispatchMessagesInSubscriptionThread() { + return new Object[][] { { false }, { true } }; } - @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched") - public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared) + @Test(dataProvider = "dispatchMessagesInSubscriptionThread") + public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread) throws Exception { persistentDispatcher.close(); List retryDelays = new CopyOnWriteArrayList<>(); doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - - PersistentDispatcherMultipleConsumers dispatcher; - if (isKeyShared) { - dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( - topicMock, cursorMock, subscriptionMock, configMock, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - } else { - dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - } + persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { + @Override + protected void reScheduleReadInMs(long readAfterMs) { + retryDelays.add(readAfterMs); + } + }; // add a consumer without permits to trigger the retry behavior consumerMockAvailablePermits.set(0); - dispatcher.addConsumer(consumerMock); + persistentDispatcher.addConsumer(consumerMock); // call "readEntriesComplete" directly to test the retry behavior List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 1); assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms"); @@ -507,7 +494,7 @@ protected void reScheduleReadInMs(long readAfterMs) { ); // test the second retry delay entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 2); double delay = retryDelays.get(1); @@ -517,7 +504,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // verify the max retry delay for (int i = 0; i < 100; i++) { entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); } Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 102); @@ -528,14 +515,14 @@ protected void reScheduleReadInMs(long readAfterMs) { // unblock to check that the retry delay is reset consumerMockAvailablePermits.set(1000); entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); // wait that the possibly async handling has completed - Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); + Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress())); // now block again to check the next retry delay so verify it was reset consumerMockAvailablePermits.set(0); entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 103); assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms"); @@ -543,89 +530,6 @@ protected void reScheduleReadInMs(long readAfterMs) { ); } - @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched") - public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared) - throws Exception { - persistentDispatcher.close(); - - // it should be possible to disable the retry delay - // by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs to 0 - retryBackoffInitialTimeInMs=0; - retryBackoffMaxTimeInMs=0; - - List retryDelays = new CopyOnWriteArrayList<>(); - doReturn(dispatchMessagesInSubscriptionThread).when(configMock) - .isDispatcherDispatchMessagesInSubscriptionThread(); - - PersistentDispatcherMultipleConsumers dispatcher; - if (isKeyShared) { - dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( - topicMock, cursorMock, subscriptionMock, configMock, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - } else { - dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - } - - // add a consumer without permits to trigger the retry behavior - consumerMockAvailablePermits.set(0); - dispatcher.addConsumer(consumerMock); - - // call "readEntriesComplete" directly to test the retry behavior - List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 1); - assertEquals(retryDelays.get(0), 0, "Initial retry delay should be 0ms"); - } - ); - // test the second retry delay - entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 2); - double delay = retryDelays.get(1); - assertEquals(delay, 0, 0, "Second retry delay should be 0ms"); - } - ); - // verify the max retry delay - for (int i = 0; i < 100; i++) { - entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - } - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 102); - double delay = retryDelays.get(101); - assertEquals(delay, 0, 0, "Max delay should be 0ms"); - } - ); - // unblock to check that the retry delay is reset - consumerMockAvailablePermits.set(1000); - entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - // wait that the possibly async handling has completed - Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); - - // now block again to check the next retry delay so verify it was reset - consumerMockAvailablePermits.set(0); - entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 103); - assertEquals(retryDelays.get(0), 0, "Resetted retry delay should be 0ms"); - } - ); - } - private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); }