diff --git a/conf/broker.conf b/conf/broker.conf index 7df126ce56d84..3689721e26751 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -462,16 +462,6 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000 # The read failure backoff mandatory stop time in milliseconds. By default it is 0s. dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 -# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered -# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff -# delay. This parameter sets the initial backoff delay in milliseconds. -dispatcherRetryBackoffInitialTimeInMs=100 - -# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered -# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff -# delay. This parameter sets the maximum backoff delay in milliseconds. -dispatcherRetryBackoffMaxTimeInMs=1000 - # Precise dispathcer flow control according to history message number of each entry preciseDispatcherFlowControl=false diff --git a/conf/standalone.conf b/conf/standalone.conf index ec2c87369e2ad..da8c975996035 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -278,16 +278,6 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000 # The read failure backoff mandatory stop time in milliseconds. By default it is 0s. dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 -# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered -# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff -# delay. This parameter sets the initial backoff delay in milliseconds. -dispatcherRetryBackoffInitialTimeInMs=100 - -# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered -# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff -# delay. This parameter sets the maximum backoff delay in milliseconds. -dispatcherRetryBackoffMaxTimeInMs=1000 - # Precise dispathcer flow control according to history message number of each entry preciseDispatcherFlowControl=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0068a7f1107b0..ff401fdb2b70a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1172,20 +1172,6 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0; - @FieldContext( - category = CATEGORY_POLICIES, - doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered " - + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff " - + "delay. This parameter sets the initial backoff delay in milliseconds.") - private int dispatcherRetryBackoffInitialTimeInMs = 100; - - @FieldContext( - category = CATEGORY_POLICIES, - doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered " - + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff " - + "delay. This parameter sets the maximum backoff delay in milliseconds.") - private int dispatcherRetryBackoffMaxTimeInMs = 1000; - @FieldContext( dynamic = true, category = CATEGORY_SERVER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 23c4cdd84c2d9..ae844b5784456 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -47,7 +47,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; @@ -85,6 +84,7 @@ */ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback { + protected final PersistentTopic topic; protected final ManagedCursor cursor; protected volatile Range lastIndividualDeletedRangeFromCursorRecovery; @@ -122,8 +122,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false); protected final ExecutorService dispatchMessagesThread; private final SharedConsumerAssignor assignor; - protected int lastNumberOfEntriesDispatched; - private final Backoff retryBackoff; + protected enum ReadType { Normal, Replay } @@ -148,15 +147,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.initializeDispatchRateLimiterIfNeeded(); this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay); - ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration(); this.readFailureBackoff = new Backoff( - serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(), + topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); - retryBackoff = new Backoff( - serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, - serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS, - 0, TimeUnit.MILLISECONDS); } @Override @@ -398,20 +392,16 @@ public synchronized void readMoreEntries() { @Override protected void reScheduleRead() { - reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS); - } - - protected void reScheduleReadInMs(long readAfterMs) { if (isRescheduleReadInProgress.compareAndSet(false, true)) { if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs); + log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS); } topic.getBrokerService().executor().schedule( () -> { isRescheduleReadInProgress.set(false); readMoreEntries(); }, - readAfterMs, TimeUnit.MILLISECONDS); + MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); } } @@ -622,8 +612,8 @@ public final synchronized void readEntriesComplete(List entries, Object c log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); } - long totalBytesSize = entries.stream().mapToLong(Entry::getLength).sum(); - updatePendingBytesToDispatch(totalBytesSize); + long size = entries.stream().mapToLong(Entry::getLength).sum(); + updatePendingBytesToDispatch(size); // dispatch messages to a separate thread, but still in order for this subscription // sendMessagesToConsumers is responsible for running broker-side filters @@ -633,28 +623,19 @@ public final synchronized void readEntriesComplete(List entries, Object c // in a separate thread, and we want to prevent more reads acquireSendInProgress(); dispatchMessagesThread.execute(() -> { - handleSendingMessagesAndReadingMore(readType, entries, false, totalBytesSize); + if (sendMessagesToConsumers(readType, entries, false)) { + updatePendingBytesToDispatch(-size); + readMoreEntries(); + } else { + updatePendingBytesToDispatch(-size); + } }); } else { - handleSendingMessagesAndReadingMore(readType, entries, true, totalBytesSize); - } - } - - private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, List entries, - boolean needAcquireSendInProgress, - long totalBytesSize) { - boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress); - int entriesDispatched = lastNumberOfEntriesDispatched; - updatePendingBytesToDispatch(-totalBytesSize); - if (triggerReadingMore) { - if (entriesDispatched > 0) { - // Reset the backoff when we successfully dispatched messages - retryBackoff.reset(); - // Call readMoreEntries in the same thread to trigger the next read - readMoreEntries(); - } else if (entriesDispatched == 0) { - // If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay - reScheduleReadInMs(retryBackoff.next()); + if (sendMessagesToConsumers(readType, entries, true)) { + updatePendingBytesToDispatch(-size); + readMoreEntriesAsync(); + } else { + updatePendingBytesToDispatch(-size); } } } @@ -693,7 +674,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); } - lastNumberOfEntriesDispatched = 0; int entriesToDispatch = entries.size(); // Trigger read more messages @@ -807,8 +787,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); - - lastNumberOfEntriesDispatched = entriesToDispatch; } return true; } @@ -871,7 +849,6 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType, totalBytesSent += sendMessageInfo.getTotalBytes(); } - lastNumberOfEntriesDispatched = (int) totalEntries; acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); return numConsumers.get() == 0; // trigger a new readMoreEntries() call diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 397cb7226b767..2df9f38531f5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -178,7 +178,6 @@ protected Map> initialValue() throws Exception { @Override protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { - lastNumberOfEntriesDispatched = 0; long totalMessagesSent = 0; long totalBytesSent = 0; long totalEntries = 0; @@ -313,8 +312,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } - - lastNumberOfEntriesDispatched = (int) totalEntries; // acquire message-dispatch permits for already delivered messages acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index e7ec93a5d6a24..df017eaf3a8f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -234,9 +234,6 @@ protected void doInitConf() throws Exception { this.conf.setWebServicePort(Optional.of(0)); this.conf.setNumExecutorThreadPoolSize(5); this.conf.setExposeBundlesMetricsInPrometheus(true); - // Disable the dispatcher retry backoff in tests by default - this.conf.setDispatcherRetryBackoffInitialTimeInMs(0); - this.conf.setDispatcherRetryBackoffMaxTimeInMs(0); } protected final void init() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index f6431b38fb6fc..7e1b5f8c71e6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -35,7 +35,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; @@ -49,7 +48,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -77,7 +75,6 @@ import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -97,7 +94,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { final String topicName = "persistent://public/default/testTopic"; final String subscriptionName = "testSubscription"; - private AtomicInteger consumerMockAvailablePermits; @BeforeMethod public void setup() throws Exception { @@ -107,8 +103,7 @@ public void setup() throws Exception { doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); - doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); + pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -140,8 +135,7 @@ public void setup() throws Exception { consumerMock = mock(Consumer.class); channelMock = mock(ChannelPromise.class); doReturn("consumer1").when(consumerMock).consumerName(); - consumerMockAvailablePermits = new AtomicInteger(1000); - doAnswer(invocation -> consumerMockAvailablePermits.get()).when(consumerMock).getAvailablePermits(); + doReturn(1000).when(consumerMock).getAvailablePermits(); doReturn(true).when(consumerMock).isWritable(); doReturn(channelMock).when(consumerMock).sendMessages( anyList(), @@ -459,77 +453,6 @@ public void testMessageRedelivery() throws Exception { allEntries.forEach(entry -> entry.release()); } - @DataProvider(name = "dispatchMessagesInSubscriptionThread") - private Object[][] dispatchMessagesInSubscriptionThread() { - return new Object[][] { { false }, { true } }; - } - - @Test(dataProvider = "dispatchMessagesInSubscriptionThread") - public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread) - throws Exception { - persistentDispatcher.close(); - - List retryDelays = new CopyOnWriteArrayList<>(); - doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( - topicMock, cursorMock, subscriptionMock, configMock, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - - // add a consumer without permits to trigger the retry behavior - consumerMockAvailablePermits.set(0); - persistentDispatcher.addConsumer(consumerMock); - - // call "readEntriesComplete" directly to test the retry behavior - List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 1); - assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms"); - } - ); - // test the second retry delay - entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 2); - double delay = retryDelays.get(1); - assertEquals(delay, 20.0, 2.0, "Second retry delay should be 20ms (jitter <-10%)"); - } - ); - // verify the max retry delay - for (int i = 0; i < 100; i++) { - entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - } - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 102); - double delay = retryDelays.get(101); - assertEquals(delay, 50.0, 5.0, "Max delay should be 50ms (jitter <-10%)"); - } - ); - // unblock to check that the retry delay is reset - consumerMockAvailablePermits.set(1000); - entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - // wait that the possibly async handling has completed - Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress())); - - // now block again to check the next retry delay so verify it was reset - consumerMockAvailablePermits.set(0); - entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 103); - assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms"); - } - ); - } - private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 26f1686db679f..3a83d2f95fea0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -162,9 +162,6 @@ protected void startBroker() throws Exception { conf.setBrokerDeduplicationEnabled(true); conf.setTransactionBufferSnapshotMaxTransactionCount(2); conf.setTransactionBufferSnapshotMinTimeInMillis(2000); - // Disable the dispatcher retry backoff in tests by default - conf.setDispatcherRetryBackoffInitialTimeInMs(0); - conf.setDispatcherRetryBackoffMaxTimeInMs(0); serviceConfigurationList.add(conf); PulsarTestContext.Builder testContextBuilder =