From 67605e700a39e27d2f198fddaeee55d2c9ac8755 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 21 Oct 2024 14:11:34 +0300 Subject: [PATCH 01/28] Add solution to PulsarMockBookKeeper for intercepting reads (cherry picked from commit 005b5c0e3f6c273695253d9c3a9d0a2725b9d54f) --- .../client/PulsarMockBookKeeper.java | 8 +++- .../client/PulsarMockLedgerHandle.java | 2 +- .../client/PulsarMockReadHandle.java | 31 +++++++++----- .../PulsarMockReadHandleInterceptor.java | 40 +++++++++++++++++++ 4 files changed, 68 insertions(+), 13 deletions(-) create mode 100644 testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 1e979206e16d3..344173c30918d 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -40,6 +40,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -96,6 +98,9 @@ public static Collection getMockEnsemble() { final Queue addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>(); final List> failures = new ArrayList<>(); final List> addEntryFailures = new ArrayList<>(); + @Setter + @Getter + private volatile PulsarMockReadHandleInterceptor readHandleInterceptor; public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception { this.orderedExecutor = orderedExecutor; @@ -250,7 +255,8 @@ public CompletableFuture execute() { return FutureUtils.exception(new BKException.BKUnauthorizedAccessException()); } else { return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId, - lh.getLedgerMetadata(), lh.entries)); + lh.getLedgerMetadata(), lh.entries, + PulsarMockBookKeeper.this::getReadHandleInterceptor)); } }); } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index aa61e541d0d6b..d30684e604670 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -73,7 +73,7 @@ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, this.digest = digest; this.passwd = Arrays.copyOf(passwd, passwd.length); - readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries); + readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index a4361f62254e4..9f3f4969199ce 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -40,28 +41,36 @@ class PulsarMockReadHandle implements ReadHandle { private final long ledgerId; private final LedgerMetadata metadata; private final List entries; + private final Supplier readHandleInterceptorSupplier; PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, - List entries) { + List entries, + Supplier readHandleInterceptorSupplier) { this.bk = bk; this.ledgerId = ledgerId; this.metadata = metadata; this.entries = entries; + this.readHandleInterceptorSupplier = readHandleInterceptorSupplier; } @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { return bk.getProgrammedFailure().thenComposeAsync((res) -> { - log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); - List seq = new ArrayList<>(); - long entryId = firstEntry; - while (entryId <= lastEntry && entryId < entries.size()) { - seq.add(entries.get((int) entryId++).duplicate()); - } - log.debug("Entries read: {}", seq); - - return FutureUtils.value(LedgerEntriesImpl.create(seq)); - }); + log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); + List seq = new ArrayList<>(); + long entryId = firstEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + seq.add(entries.get((int) entryId++).duplicate()); + } + log.debug("Entries read: {}", seq); + LedgerEntriesImpl ledgerEntries = LedgerEntriesImpl.create(seq); + PulsarMockReadHandleInterceptor pulsarMockReadHandleInterceptor = readHandleInterceptorSupplier.get(); + if (pulsarMockReadHandleInterceptor != null) { + return pulsarMockReadHandleInterceptor.interceptReadAsync(ledgerId, firstEntry, lastEntry, + ledgerEntries); + } + return FutureUtils.value(ledgerEntries); + }); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java new file mode 100644 index 0000000000000..acee87b0f77f4 --- /dev/null +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.api.LedgerEntries; + +/** + * Interceptor interface for intercepting read handle readAsync operations. + * This is useful for testing purposes, for example for introducing delays. + */ +public interface PulsarMockReadHandleInterceptor { + /** + * Intercepts the readAsync operation on a read handle. + * + * @param ledgerId ledger id + * @param firstEntry first entry to read + * @param lastEntry last entry to read + * @param entries entries that would be returned by the read operation + * @return CompletableFuture that will complete with the entries to return + */ + CompletableFuture interceptReadAsync(long ledgerId, long firstEntry, long lastEntry, + LedgerEntries entries); +} From e40461f85c9fd8a917797def51183485163e8bc7 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 17 Jan 2025 18:10:15 +0200 Subject: [PATCH 02/28] Improve quiet time implementation in receiveMessages --- .../apache/pulsar/broker/BrokerTestUtil.java | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index 8364cae53b223..fee0ababe114d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Stream; @@ -239,33 +240,42 @@ public static void receiveMessages(BiFunction, Message, Boole public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, Duration quietTimeout, Stream> consumers) { + long quietTimeoutNanos = quietTimeout.toNanos(); + AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime()); FutureUtil.waitForAll(consumers - .map(consumer -> receiveMessagesAsync(consumer, quietTimeout, messageHandler)).toList()).join(); + .map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, messageHandler, + lastMessageReceivedNanos)).toList()).join(); } // asynchronously receive messages from a consumer and handle them using the provided message handler // the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads // this is useful in tests where multiple consumers are needed to test the functionality - private static CompletableFuture receiveMessagesAsync(Consumer consumer, Duration quietTimeout, - BiFunction, Message, Boolean> - messageHandler) { - CompletableFuture> receiveFuture = consumer.receiveAsync(); - return receiveFuture - .orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS) + private static CompletableFuture receiveMessagesAsync(Consumer consumer, + long quietTimeoutNanos, + BiFunction, Message, Boolean> + messageHandler, + AtomicLong lastMessageReceivedNanos) { + return consumer.receiveAsync() + .orTimeout(quietTimeoutNanos, TimeUnit.NANOSECONDS) .handle((msg, t) -> { + long currentNanos = System.nanoTime(); if (t != null) { if (t instanceof TimeoutException) { - // cancel the receive future so that Pulsar client can clean up the resources - receiveFuture.cancel(false); - return false; + if (currentNanos - lastMessageReceivedNanos.get() > quietTimeoutNanos) { + return false; + } else { + return true; + } } else { throw FutureUtil.wrapToCompletionException(t); } } + lastMessageReceivedNanos.set(currentNanos); return messageHandler.apply(consumer, msg); }).thenComposeAsync(receiveMore -> { if (receiveMore) { - return receiveMessagesAsync(consumer, quietTimeout, messageHandler); + return receiveMessagesAsync(consumer, quietTimeoutNanos, messageHandler, + lastMessageReceivedNanos); } else { return CompletableFuture.completedFuture(null); } From 7e69af577c74555d05e99440920a1d8dbc7caea2 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 17 Jan 2025 17:23:52 +0200 Subject: [PATCH 03/28] Add debug when added to replay --- .../persistent/PersistentDispatcherMultipleConsumers.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index fa03a260e131e..82b96c365072f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1422,6 +1422,9 @@ public void cursorIsReset() { protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) { if (checkIfMessageIsUnacked(ledgerId, entryId)) { + if (log.isDebugEnabled()) { + log.debug("[{}] Adding message to replay for {}:{} hash: {}", name, ledgerId, entryId, stickyKeyHash); + } redeliveryMessages.add(ledgerId, entryId, stickyKeyHash); return true; } else { From 6a354dfaf34f4bafb2ea56be65a14b83c294e377 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 20 Jan 2025 21:03:59 +0200 Subject: [PATCH 04/28] Enable test logging at debug level, add more logging --- pulsar-broker/src/test/resources/log4j2.xml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml index a0732096f2845..b506421d5bb0a 100644 --- a/pulsar-broker/src/test/resources/log4j2.xml +++ b/pulsar-broker/src/test/resources/log4j2.xml @@ -36,16 +36,21 @@ - + + + + + + - --> From 7c1d3db79c7db633771de56d5e503115f8a010c7 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 20 Jan 2025 21:05:52 +0200 Subject: [PATCH 05/28] Cancel pending read --- ...PersistentDispatcherMultipleConsumers.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 82b96c365072f..a290830c7d633 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -335,6 +335,13 @@ public synchronized void readMoreEntries() { } return; } + if (havePendingReplayRead) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Skipping read for the topic, Due to replay in-progress.", topic.getName(), + getSubscriptionName()); + } + return; + } if (isSendInProgress()) { // we cannot read more entries while sending the previous batch // otherwise we could re-read the same entries and send duplicates @@ -379,13 +386,19 @@ public synchronized void readMoreEntries() { long bytesToRead = calculateResult.getRight(); if (messagesToRead == -1 || bytesToRead == -1) { - // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete. + // Skip read as topic/dispatcher has exceed the dispatch rate return; } Set messagesToReplayNow = canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet(); if (!messagesToReplayNow.isEmpty()) { + // before replaying, cancel possible pending read that is waiting for more entries + cancelPendingRead(); + if (havePendingRead) { + // skip read since a pending read is already in progress which cannot be cancelled + return; + } if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), consumerList.size()); @@ -615,13 +628,6 @@ protected Pair calculateToRead(int currentTotalAvailablePermits) } } - if (havePendingReplayRead) { - if (log.isDebugEnabled()) { - log.debug("[{}] Skipping replay while awaiting previous read to complete", name); - } - return Pair.of(-1, -1L); - } - // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException messagesToRead = Math.max(messagesToRead, 1); bytesToRead = Math.max(bytesToRead, 1); @@ -717,6 +723,12 @@ public SubType getType() { public final synchronized void readEntriesComplete(List entries, Object ctx) { ReadType readType = (ReadType) ctx; if (readType == ReadType.Normal) { + if (!havePendingRead) { + log.debug("Discarding read entries as there is no pending read"); + entries.forEach(Entry::release); + readMoreEntriesAsync(); + return; + } havePendingRead = false; } else { havePendingReplayRead = false; From 47f7583e077a9c90741506abdb49fd567d5ba45f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 21 Jan 2025 09:36:42 +0200 Subject: [PATCH 06/28] Add debug log to skipping pending replay read --- .../persistent/PersistentDispatcherMultipleConsumers.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index a290830c7d633..29c22642ac785 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -397,6 +397,10 @@ public synchronized void readMoreEntries() { cancelPendingRead(); if (havePendingRead) { // skip read since a pending read is already in progress which cannot be cancelled + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Skipping replay read for the topic, Due to pending read in-progress.", + topic.getName(), getSubscriptionName()); + } return; } if (log.isDebugEnabled()) { From 4dc843b3fd9a587bbf48357a0376be517808e216 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 16 Jan 2025 14:58:52 +0200 Subject: [PATCH 07/28] Add test --- ...edSubscriptionDisabledBrokerCacheTest.java | 332 ++++++++++++++++++ 1 file changed, 332 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java new file mode 100644 index 0000000000000..8da86a63513c8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.fail; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.StickyKeyDispatcher; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.tests.KeySharedImplementationType; +import org.awaitility.Awaitility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class KeySharedSubscriptionDisabledBrokerCacheTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionDisabledBrokerCacheTest.class); + private static final List keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + private static final String SUBSCRIPTION_NAME = "key_shared"; + private final KeySharedImplementationType implementationType; + + // Comment out the next line (Factory annotation) to run tests manually in IntelliJ, one-by-one + //@Factory + public static Object[] createTestInstances() { + return KeySharedImplementationType.generateTestInstances(KeySharedSubscriptionDisabledBrokerCacheTest::new); + } + + public KeySharedSubscriptionDisabledBrokerCacheTest() { + // set the default implementation type for manual running in IntelliJ + this(KeySharedImplementationType.PIP379); + } + + public KeySharedSubscriptionDisabledBrokerCacheTest(KeySharedImplementationType implementationType) { + this.implementationType = implementationType; + } + + @DataProvider(name = "currentImplementationType") + public Object[] currentImplementationType() { + return new Object[]{ implementationType }; + } + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setSubscriptionKeySharedUseClassicPersistentImplementation(implementationType.classic); + conf.setSubscriptionSharedUseClassicPersistentImplementation(implementationType.classic); + this.conf.setUnblockStuckSubscriptionEnabled(false); + this.conf.setSubscriptionKeySharedUseConsistentHashing(true); + conf.setManagedLedgerCacheSizeMB(0); + conf.setManagedLedgerMaxReadsInFlightSizeInMB(0); + conf.setDispatcherRetryBackoffInitialTimeInMs(0); + conf.setDispatcherRetryBackoffMaxTimeInMs(0); + conf.setKeySharedUnblockingIntervalMs(0); + conf.setBrokerDeduplicationEnabled(true); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @AfterMethod(alwaysRun = true) + public void resetDefaultNamespace() throws Exception { + List list = admin.namespaces().getTopics("public/default"); + for (String topicName : list){ + if (!pulsar.getBrokerService().isSystemTopic(topicName)) { + admin.topics().delete(topicName, false); + } + } + // reset read ahead limits to defaults + ServiceConfiguration defaultConf = new ServiceConfiguration(); + conf.setKeySharedLookAheadMsgInReplayThresholdPerSubscription( + defaultConf.getKeySharedLookAheadMsgInReplayThresholdPerSubscription()); + conf.setKeySharedLookAheadMsgInReplayThresholdPerConsumer( + defaultConf.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()); + } + + @AfterMethod(alwaysRun = true) + public void resetInterceptor() { + pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor(null); + } + + // Use a fixed seed to make the tests using random values deterministic + // When a test fails, it's possible to re-run it to reproduce the issue + private static final Random random = new Random(1); + + private Producer createProducer(String topic, boolean enableBatch) throws PulsarClientException { + Producer producer = null; + if (enableBatch) { + producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(true) + .maxPendingMessages(2001) + .batcherBuilder(BatcherBuilder.KEY_BASED) + .create(); + } else { + producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .maxPendingMessages(2001) + .enableBatching(false) + .create(); + } + return producer; + } + + @SneakyThrows + private StickyKeyConsumerSelector getSelector(String topic, String subscription) { + Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + PersistentSubscription sub = (PersistentSubscription) t.getSubscription(subscription); + StickyKeyDispatcher dispatcher = (StickyKeyDispatcher) sub.getDispatcher(); + return dispatcher.getSelector(); + } + + @Test(dataProvider = "currentImplementationType", invocationCount = 1) + public void testMessageOrderInSingleConsumerReconnect(KeySharedImplementationType impl) throws Exception { + String topic = newUniqueName("testMessageOrderInSingleConsumerReconnect"); + int numberOfKeys = 100; + long pauseTime = 100L; + + @Cleanup + PulsarClient pulsarClient2 = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .build(); + + @Cleanup + PulsarClient pulsarClient3 = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .build(); + + @Cleanup + Producer producer = createProducer(topic, false); + + // create a consumer and close it to create a subscription + pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe() + .close(); + + Set remainingMessageValues = Collections.synchronizedSet(new HashSet<>()); + BlockingQueue, Message>> unackedMessages = new LinkedBlockingQueue<>(); + AtomicBoolean c2MessagesShouldBeUnacked = new AtomicBoolean(true); + Set keysForC2 = new HashSet<>(); + AtomicLong lastMessageTimestamp = new AtomicLong(System.currentTimeMillis()); + + Map> keyPositions = new HashMap<>(); + MessageListener messageHandler = (consumer, msg) -> { + lastMessageTimestamp.set(System.currentTimeMillis()); + synchronized (this) { + String key = msg.getKey(); + if (c2MessagesShouldBeUnacked.get() && keysForC2.contains(key)) { + unackedMessages.add(Pair.of(consumer, msg)); + return; + } + long delayMillis = ThreadLocalRandom.current().nextLong(25, 50); + CompletableFuture.delayedExecutor(delayMillis, TimeUnit.MILLISECONDS).execute(() -> + consumer.acknowledgeAsync(msg)); + MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId(); + Position currentPosition = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); + Pair prevPair = keyPositions.get(key); + if (prevPair != null && prevPair.getLeft().compareTo(currentPosition) > 0) { + boolean isDuplicate = !remainingMessageValues.contains(msg.getValue()); + log.error("key: {} value: {} prev: {}/{} current: {}/{} duplicate: {}", key, msg.getValue(), prevPair.getLeft(), + prevPair.getRight(), currentPosition, consumer.getConsumerName(), isDuplicate); + fail("out of order"); + } + keyPositions.put(key, Pair.of(currentPosition, consumer.getConsumerName())); + boolean removed = remainingMessageValues.remove(msg.getValue()); + if (!removed) { + // duplicates are possible during reconnects, this is not an error + log.warn("Duplicate message: {} value: {}", msg.getMessageId(), msg.getValue()); + } + } + }; + + // Adding a new consumer. + @Cleanup + Consumer c1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c1") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(messageHandler) + .subscribe(); + + @Cleanup + Consumer c2 = pulsarClient2.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(messageHandler) + .subscribe(); + + @Cleanup + Consumer c3 = pulsarClient3.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c3") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(messageHandler) + .subscribe(); + + StickyKeyConsumerSelector selector = getSelector(topic, SUBSCRIPTION_NAME); + + // find keys that will be assigned to c2 + for (int i = 0; i < numberOfKeys; i++) { + String key = String.valueOf(i); + byte[] keyBytes = key.getBytes(UTF_8); + int hash = selector.makeStickyKeyHash(keyBytes); + if (selector.select(hash).consumerName().equals("c2")) { + keysForC2.add(key); + } + } + + // close c2 + c2.close(); + Thread.sleep(pauseTime); + + // produce messages with random keys + for (int i = 0; i < 1000; i++) { + String key = String.valueOf(random.nextInt(numberOfKeys)); + //log.info("Producing message with key: {} value: {}", key, i); + producer.newMessage() + .key(key) + .value(i) + .send(); + remainingMessageValues.add(i); + } + + // reconnect c2 + c2 = pulsarClient2.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(SUBSCRIPTION_NAME) + .subscriptionType(SubscriptionType.Key_Shared) + .messageListener(messageHandler) + .startPaused(true) + .subscribe(); + + Thread.sleep(2 * pauseTime); + + // produce messages with c2 keys so that possible race conditions would be more likely to happen + List keysForC2List=new ArrayList<>(keysForC2); + for (int i = 1000; i < 1100; i++) { + String key = keysForC2List.get(random.nextInt(keysForC2List.size())); + log.info("Producing message with key: {} value: {}", key, i); + producer.newMessage() + .key(key) + .value(i) + .send(); + remainingMessageValues.add(i); + } + + Thread.sleep(2 * pauseTime); + + log.info("Acking unacked messages to unblock c2 keys"); + // ack the unacked messages to unblock c2 keys + c2MessagesShouldBeUnacked.set(false); + Pair, Message> consumerMessagePair; + while ((consumerMessagePair = unackedMessages.poll()) != null) { + messageHandler.received(consumerMessagePair.getLeft(), consumerMessagePair.getRight()); + } + + // resume c2 so that permits are while hashes are unblocked so that possible race conditions would + // be more likely to happen + log.info("Resuming c2"); + c2.resume(); + + Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> { + return remainingMessageValues.isEmpty() + || System.currentTimeMillis() - lastMessageTimestamp.get() > 50 * pauseTime; + }); + + try { + assertThat(remainingMessageValues).isEmpty(); + } finally { + logTopicStats(topic); + } + } +} From 0512f3817a6f451180a922f48603e0b30b09032c Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 21 Jan 2025 12:57:01 +0200 Subject: [PATCH 08/28] Notify also when the consumer isn't closing --- .../broker/service/DrainingHashesTracker.java | 2 +- .../service/DrainingHashesTrackerTest.java | 21 ------------------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 9bc5c5f1e44ec..fff20e86e668c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -340,7 +340,7 @@ public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) { lock.writeLock().lock(); try { removed = drainingHashes.remove(stickyHash); - if (!closing && removed.isBlocking()) { + if (removed.isBlocking()) { if (batchLevel > 0) { unblockedWhileBatching = true; } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java index ecb20beeb648a..9c409028f6d23 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java @@ -19,9 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.BrokerTestUtil.createMockConsumer; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -191,23 +189,4 @@ public void unblockingHandler_InvokesStickyKeyHashUnblocked() { // then unblocking call should be done verify(unblockingHandler).stickyKeyHashUnblocked(1); } - - @Test - public void unblockingHandler_DoesNotInvokeStickyKeyHashUnblockedWhenClosing() { - // given a tracker with unblocking handler - UnblockingHandler unblockingHandler = mock(UnblockingHandler.class); - DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", unblockingHandler); - - // when a hash is draining - Consumer consumer = createMockConsumer("consumer1"); - tracker.addEntry(consumer, 1); - // aand hash gets blocked - Consumer consumer2 = createMockConsumer("consumer2"); - tracker.shouldBlockStickyKeyHash(consumer2, 1); - // and hash gets unblocked - tracker.reduceRefCount(consumer, 1, true); - - // then unblocking call should be done - verify(unblockingHandler, never()).stickyKeyHashUnblocked(anyInt()); - } } \ No newline at end of file From 3913e6413a3c866639bbddd460aadc60ff1d7987 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 21 Jan 2025 12:17:18 +0200 Subject: [PATCH 09/28] Postpone removals after critical sections to prevent race conditions --- .../broker/service/DrainingHashesTracker.java | 94 ++++++------------ .../OutsideCriticalSectionsExecutor.java | 97 +++++++++++++++++++ ...tStickyKeyDispatcherMultipleConsumers.java | 23 ++++- .../service/DrainingHashesTrackerTest.java | 52 ---------- 4 files changed, 149 insertions(+), 117 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index fff20e86e668c..8ccfc49ddcc8f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.PrimitiveIterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; import lombok.ToString; @@ -50,10 +51,9 @@ public class DrainingHashesTracker { // optimize the memory consumption of the map by using primitive int keys private final Int2ObjectOpenHashMap drainingHashes = new Int2ObjectOpenHashMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - int batchLevel; - boolean unblockedWhileBatching; private final Map consumerDrainingHashesStatsMap = new ConcurrentHashMap<>(); + private final Executor removalExecutor; /** * Represents an entry in the draining hashes tracker. @@ -220,8 +220,13 @@ public interface UnblockingHandler { } public DrainingHashesTracker(String dispatcherName, UnblockingHandler unblockingHandler) { + this(dispatcherName, unblockingHandler, Runnable::run); + } + + public DrainingHashesTracker(String dispatcherName, UnblockingHandler unblockingHandler, Executor removalExecutor) { this.dispatcherName = dispatcherName; this.unblockingHandler = unblockingHandler; + this.removalExecutor = removalExecutor; } /** @@ -274,39 +279,6 @@ public void addEntry(Consumer consumer, int stickyHash) { } } - /** - * Start a batch operation. There could be multiple nested batch operations. - * The unblocking of sticky key hashes will be done only when the last batch operation ends. - */ - public void startBatch() { - lock.writeLock().lock(); - try { - batchLevel++; - } finally { - lock.writeLock().unlock(); - } - } - - /** - * End a batch operation. - */ - public void endBatch() { - boolean notifyUnblocking = false; - lock.writeLock().lock(); - try { - if (--batchLevel == 0 && unblockedWhileBatching) { - unblockedWhileBatching = false; - notifyUnblocking = true; - } - } finally { - lock.writeLock().unlock(); - } - // notify unblocking of the hash outside the lock - if (notifyUnblocking) { - unblockingHandler.stickyKeyHashUnblocked(-1); - } - } - /** * Reduce the reference count for a given sticky hash. * @@ -330,40 +302,38 @@ public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) { + "."); } if (entry.decrementRefCount()) { - if (log.isDebugEnabled()) { - log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, - consumer.consumerId(), consumer.consumerName()); - } + removalExecutor.execute(() -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, + consumer.consumerId(), consumer.consumerName()); + } - DrainingHashEntry removed; - boolean notifyUnblocking = false; - lock.writeLock().lock(); - try { - removed = drainingHashes.remove(stickyHash); - if (removed.isBlocking()) { - if (batchLevel > 0) { - unblockedWhileBatching = true; - } else { + DrainingHashEntry removed; + boolean notifyUnblocking = false; + lock.writeLock().lock(); + try { + removed = drainingHashes.remove(stickyHash); + if (removed.isBlocking()) { notifyUnblocking = true; } + } finally { + lock.writeLock().unlock(); } - } finally { - lock.writeLock().unlock(); - } - // perform side-effects outside of the lock to reduce chances for deadlocks + // perform side-effects outside of the lock to reduce chances for deadlocks - // update the consumer specific stats - ConsumerDrainingHashesStats drainingHashesStats = - consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); - if (drainingHashesStats != null) { - drainingHashesStats.clearHash(stickyHash); - } + // update the consumer specific stats + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyHash); + } - // notify unblocking of the hash outside the lock - if (notifyUnblocking) { - unblockingHandler.stickyKeyHashUnblocked(stickyHash); - } + // notify unblocking of the hash outside the lock + if (notifyUnblocking) { + unblockingHandler.stickyKeyHashUnblocked(stickyHash); + } + }); } else { if (log.isDebugEnabled()) { log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java new file mode 100644 index 0000000000000..77921d1a976c6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Executor that runs tasks in the current thread when + * there aren't any critical sections in execution. + */ +public class OutsideCriticalSectionsExecutor implements Executor { + private final AtomicInteger criticalSectionsCount = new AtomicInteger(); + private final Queue queuedTasks = new ConcurrentLinkedQueue<>(); + private final ReadWriteLock executionLock = new ReentrantReadWriteLock(); + + @Override + public void execute(Runnable command) { + executionLock.writeLock().lock(); + try { + if (criticalSectionsCount.get() == 0) { + command.run(); + } else { + queuedTasks.add(command); + } + } finally { + executionLock.writeLock().unlock(); + } + } + + public void enterCriticalSection() { + executionLock.readLock().lock(); + try { + criticalSectionsCount.incrementAndGet(); + } finally { + executionLock.readLock().unlock(); + } + } + + public void exitCriticalSection() { + if (criticalSectionsCount.decrementAndGet() == 0) { + runQueuedTasks(); + } + } + + public T runCriticalSectionCallable(Callable callable) { + executionLock.readLock().lock(); + try { + criticalSectionsCount.incrementAndGet(); + return callable.call(); + } catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException(e); + } finally { + executionLock.readLock().unlock(); + exitCriticalSection(); + } + } + + private void runQueuedTasks() { + executionLock.writeLock().lock(); + try { + if (criticalSectionsCount.get() != 0) { + return; + } + Runnable command; + while ((command = queuedTasks.poll()) != null) { + command.run(); + } + } finally { + executionLock.writeLock().unlock(); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 1a3e2f706cba8..952106b5f5d64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -47,6 +48,7 @@ import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.ImpactedConsumersResult; +import org.apache.pulsar.broker.service.OutsideCriticalSectionsExecutor; import org.apache.pulsar.broker.service.PendingAcksMap; import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; @@ -74,6 +76,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private final DrainingHashesTracker drainingHashesTracker; private final RescheduleReadHandler rescheduleReadHandler; + private final OutsideCriticalSectionsExecutor outsideCriticalSectionsExecutor = + new OutsideCriticalSectionsExecutor(); PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { @@ -85,7 +89,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi this.drainingHashesRequired = keySharedMode == KeySharedMode.AUTO_SPLIT && !allowOutOfOrderDelivery; this.drainingHashesTracker = - drainingHashesRequired ? new DrainingHashesTracker(this.getName(), this::stickyKeyHashUnblocked) : null; + drainingHashesRequired ? new DrainingHashesTracker(this.getName(), this::stickyKeyHashUnblocked, + outsideCriticalSectionsExecutor) : null; this.rescheduleReadHandler = new RescheduleReadHandler(conf::getKeySharedUnblockingIntervalMs, topic.getBrokerService().executor(), this::cancelPendingRead, () -> reScheduleReadInMs(0), () -> havePendingRead, this::getReadMoreEntriesCallCount, () -> !redeliveryMessages.isEmpty()); @@ -149,12 +154,12 @@ public void handleRemoving(Consumer consumer, long ledgerId, long entryId, int s @Override public void startBatch() { - drainingHashesTracker.startBatch(); + outsideCriticalSectionsExecutor.enterCriticalSection(); } @Override public void endBatch() { - drainingHashesTracker.endBatch(); + outsideCriticalSectionsExecutor.exitCriticalSection(); } }); consumer.setDrainingHashesConsumerStatsUpdater(drainingHashesTracker::updateConsumerStats); @@ -206,8 +211,20 @@ protected synchronized void clearComponentsAfterRemovedAllConsumers() { } } + @Override + protected synchronized NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { + return outsideCriticalSectionsExecutor.runCriticalSectionCallable( + () -> super.getMessagesToReplayNow(maxMessagesToRead)); + } + @Override protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { + return outsideCriticalSectionsExecutor.runCriticalSectionCallable(() -> { + return internalTrySendMessagesToConsumers(readType, entries); + }); + } + + private synchronized boolean internalTrySendMessagesToConsumers(ReadType readType, List entries) { lastNumberOfEntriesProcessed = 0; long totalMessagesSent = 0; long totalBytesSent = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java index 9c409028f6d23..9dcbeabc1f96b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java @@ -21,7 +21,6 @@ import static org.apache.pulsar.broker.BrokerTestUtil.createMockConsumer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -109,57 +108,6 @@ public void shouldBlockStickyKeyHash_DoesNotBlockForNewEntry() { assertFalse(result); } - @Test - public void startBatch_IncrementsBatchLevel() { - DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", mock(UnblockingHandler.class)); - - tracker.startBatch(); - assertEquals(tracker.batchLevel, 1); - - tracker.startBatch(); - assertEquals(tracker.batchLevel, 2); - - tracker.startBatch(); - assertEquals(tracker.batchLevel, 3); - } - - @Test - public void endBatch_DecrementsBatchLevel() { - DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", mock(UnblockingHandler.class)); - tracker.startBatch(); - - tracker.endBatch(); - - assertEquals(tracker.batchLevel, 0); - } - - @Test - public void endBatch_InvokesUnblockingHandlerWhenUnblockedWhileBatching() { - // given a tracker with unblocking handler - UnblockingHandler unblockingHandler = mock(UnblockingHandler.class); - DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", unblockingHandler); - - // when a hash is draining - Consumer consumer1 = createMockConsumer("consumer1"); - tracker.addEntry(consumer1, 1); - // and batch starts - tracker.startBatch(); - - // when hash gets blocked - Consumer consumer2 = createMockConsumer("consumer2"); - tracker.shouldBlockStickyKeyHash(consumer2, 1); - // and it gets unblocked - tracker.reduceRefCount(consumer1, 1, false); - - // then no unblocking call should be done - verify(unblockingHandler, never()).stickyKeyHashUnblocked(anyInt()); - - // when batch ends - tracker.endBatch(); - // then unblocking call should be done - verify(unblockingHandler).stickyKeyHashUnblocked(-1); - } - @Test public void clear_RemovesAllEntries() { Consumer consumer = createMockConsumer("consumer1"); From fa60a7af73a00d94e2d3bd18c8810a0099bef693 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 21 Jan 2025 16:41:24 +0200 Subject: [PATCH 10/28] Add test and more docs for OutsideCriticalSectionsExecutor --- .../OutsideCriticalSectionsExecutor.java | 19 ++++ .../OutsideCriticalSectionsExecutorTest.java | 87 +++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutorTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java index 77921d1a976c6..c625d62f7340e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java @@ -29,12 +29,21 @@ /** * Executor that runs tasks in the current thread when * there aren't any critical sections in execution. + * If there's a critical section in execution, the tasks are queued + * and postponed until all critical sections have been exited. + * The tasks are run on the thread that exited the last critical section. */ public class OutsideCriticalSectionsExecutor implements Executor { private final AtomicInteger criticalSectionsCount = new AtomicInteger(); private final Queue queuedTasks = new ConcurrentLinkedQueue<>(); private final ReadWriteLock executionLock = new ReentrantReadWriteLock(); + /** + * Executes the given command at some time in the future. + * If there are no critical sections in execution, the command is executed immediately. + * If there are critical sections in execution, the command is queued and executed after all critical sections have + * been exited. + */ @Override public void execute(Runnable command) { executionLock.writeLock().lock(); @@ -49,6 +58,9 @@ public void execute(Runnable command) { } } + /** + * Enters a critical section. This method should be called before entering a critical section. + */ public void enterCriticalSection() { executionLock.readLock().lock(); try { @@ -58,12 +70,19 @@ public void enterCriticalSection() { } } + /** + * Exits a critical section. This method should be called after exiting a critical section. + */ public void exitCriticalSection() { if (criticalSectionsCount.decrementAndGet() == 0) { runQueuedTasks(); } } + /** + * Runs a callable which is a critical section. This method should be used when + * the result of the callable is needed and it should run as a critical section. + */ public T runCriticalSectionCallable(Callable callable) { executionLock.readLock().lock(); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutorTest.java new file mode 100644 index 0000000000000..604d885430427 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutorTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.testng.annotations.Test; + +public class OutsideCriticalSectionsExecutorTest { + @Test + public void executeRunsCommandImmediatelyWhenNoCriticalSections() { + OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); + AtomicBoolean executed = new AtomicBoolean(false); + + executor.execute(() -> executed.set(true)); + + assertTrue(executed.get()); + } + + @Test + public void executeQueuesCommandWhenInCriticalSection() { + OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); + AtomicBoolean executed = new AtomicBoolean(false); + + executor.enterCriticalSection(); + executor.execute(() -> executed.set(true)); + assertFalse(executed.get()); + + executor.exitCriticalSection(); + assertTrue(executed.get()); + } + + @Test + public void runCriticalSectionCallableExecutesCallable() throws Exception { + OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); + String result = executor.runCriticalSectionCallable(() -> "test"); + + assertEquals(result, "test"); + } + + @Test + public void runCriticalSectionCallableHandlesException() { + OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); + + assertThrows(RuntimeException.class, () -> { + executor.runCriticalSectionCallable(() -> { + throw new Exception("test"); + }); + }); + } + + @Test + public void exitCriticalSectionDoesNotRunTasksWhenStillInCriticalSection() { + OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); + AtomicBoolean executed = new AtomicBoolean(false); + + executor.enterCriticalSection(); + executor.enterCriticalSection(); + executor.execute(() -> executed.set(true)); + assertFalse(executed.get()); + + executor.exitCriticalSection(); + assertFalse(executed.get()); + + executor.exitCriticalSection(); + assertTrue(executed.get()); + } +} \ No newline at end of file From ec898d6a6bd699dd0ff84282b36cdf8bd831c993 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 21 Jan 2025 16:44:50 +0200 Subject: [PATCH 11/28] Adjust test logging - disable debug logging for key_shared related dispatcher classes --- pulsar-broker/src/test/resources/log4j2.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml index b506421d5bb0a..7b3cd6a04fcca 100644 --- a/pulsar-broker/src/test/resources/log4j2.xml +++ b/pulsar-broker/src/test/resources/log4j2.xml @@ -36,7 +36,7 @@ - + From 7bc7b92ea60e965729dc0dd1e521e3497a4272fc Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 21 Jan 2025 17:27:02 +0200 Subject: [PATCH 12/28] Fix failing test --- ...ckyKeyDispatcherMultipleConsumersTest.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 7234f0caefc63..db2c94e408a59 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -560,6 +560,11 @@ public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInS dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { + @Override + public synchronized void readMoreEntries() { + havePendingRead = true; + } + @Override protected void reScheduleReadInMs(long readAfterMs) { retryDelays.add(readAfterMs); @@ -567,6 +572,11 @@ protected void reScheduleReadInMs(long readAfterMs) { }; } else { dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { + @Override + public synchronized void readMoreEntries() { + havePendingRead = true; + } + @Override protected void reScheduleReadInMs(long readAfterMs) { retryDelays.add(readAfterMs); @@ -578,8 +588,11 @@ protected void reScheduleReadInMs(long readAfterMs) { consumerMockAvailablePermits.set(0); dispatcher.addConsumer(consumerMock); + // call "readEntriesComplete" directly to test the retry behavior List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + // call the overridden "readMoreEntries" method that sets the "havePendingRead" flag + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 1); @@ -588,6 +601,7 @@ protected void reScheduleReadInMs(long readAfterMs) { ); // test the second retry delay entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 2); @@ -598,6 +612,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // verify the max retry delay for (int i = 0; i < 100; i++) { entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); } Awaitility.await().untilAsserted(() -> { @@ -609,6 +624,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // unblock to check that the retry delay is reset consumerMockAvailablePermits.set(1000); entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); // wait that the possibly async handling has completed Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); @@ -616,6 +632,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // now block again to check the next retry delay so verify it was reset consumerMockAvailablePermits.set(0); entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 103); @@ -643,6 +660,11 @@ public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSub dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { + @Override + public synchronized void readMoreEntries() { + havePendingRead = true; + } + @Override protected void reScheduleReadInMs(long readAfterMs) { retryDelays.add(readAfterMs); @@ -650,6 +672,11 @@ protected void reScheduleReadInMs(long readAfterMs) { }; } else { dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { + @Override + public synchronized void readMoreEntries() { + havePendingRead = true; + } + @Override protected void reScheduleReadInMs(long readAfterMs) { retryDelays.add(readAfterMs); @@ -663,6 +690,8 @@ protected void reScheduleReadInMs(long readAfterMs) { // call "readEntriesComplete" directly to test the retry behavior List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + // call the overridden "readMoreEntries" method that sets the "havePendingRead" flag + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 1); @@ -671,6 +700,7 @@ protected void reScheduleReadInMs(long readAfterMs) { ); // test the second retry delay entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 2); @@ -681,6 +711,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // verify the max retry delay for (int i = 0; i < 100; i++) { entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); } Awaitility.await().untilAsserted(() -> { @@ -692,6 +723,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // unblock to check that the retry delay is reset consumerMockAvailablePermits.set(1000); entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); // wait that the possibly async handling has completed Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); @@ -699,6 +731,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // now block again to check the next retry delay so verify it was reset consumerMockAvailablePermits.set(0); entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); + dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 103); @@ -720,7 +753,7 @@ public void testNoBackoffDelayWhenDelayedMessages(boolean dispatchMessagesInSubs AtomicInteger reScheduleReadInMsCalled = new AtomicInteger(0); AtomicBoolean delayAllMessages = new AtomicBoolean(true); - AbstractPersistentDispatcherMultipleConsumers dispatcher; + PersistentDispatcherMultipleConsumers dispatcher; if (isKeyShared) { dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, @@ -732,6 +765,7 @@ protected void reScheduleReadInMs(long readAfterMs) { @Override public synchronized void readMoreEntries() { + havePendingRead = true; readMoreEntriesCalled.incrementAndGet(); } @@ -753,6 +787,7 @@ protected void reScheduleReadInMs(long readAfterMs) { @Override public synchronized void readMoreEntries() { + havePendingRead = true; readMoreEntriesCalled.incrementAndGet(); } @@ -780,6 +815,8 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata dispatcher.addConsumer(consumerMock); List entries = new ArrayList<>(List.of(EntryImpl.create(1, 1, createMessage("message1", 1)))); + dispatcher.readMoreEntries(); + readMoreEntriesCalled.set(0); dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(reScheduleReadInMsCalled.get(), 0, "reScheduleReadInMs should not be called"); From 7bca0ad361490776b56c44e659897ae540e57aab Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 21 Jan 2025 19:38:24 +0200 Subject: [PATCH 13/28] Fix race condition in test --- .../api/KeySharedSubscriptionDisabledBrokerCacheTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java index 8da86a63513c8..cfa287d4e1f26 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java @@ -272,11 +272,11 @@ public void testMessageOrderInSingleConsumerReconnect(KeySharedImplementationTyp for (int i = 0; i < 1000; i++) { String key = String.valueOf(random.nextInt(numberOfKeys)); //log.info("Producing message with key: {} value: {}", key, i); + remainingMessageValues.add(i); producer.newMessage() .key(key) .value(i) .send(); - remainingMessageValues.add(i); } // reconnect c2 @@ -296,11 +296,11 @@ public void testMessageOrderInSingleConsumerReconnect(KeySharedImplementationTyp for (int i = 1000; i < 1100; i++) { String key = keysForC2List.get(random.nextInt(keysForC2List.size())); log.info("Producing message with key: {} value: {}", key, i); + remainingMessageValues.add(i); producer.newMessage() .key(key) .value(i) .send(); - remainingMessageValues.add(i); } Thread.sleep(2 * pauseTime); From c8d14f3f98223aff343413ba575b5429bbcee0f1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 10:03:28 +0200 Subject: [PATCH 14/28] Revert "Add solution to PulsarMockBookKeeper for intercepting reads" This reverts commit 67605e700a39e27d2f198fddaeee55d2c9ac8755. --- .../client/PulsarMockBookKeeper.java | 8 +--- .../client/PulsarMockLedgerHandle.java | 2 +- .../client/PulsarMockReadHandle.java | 31 +++++--------- .../PulsarMockReadHandleInterceptor.java | 40 ------------------- 4 files changed, 13 insertions(+), 68 deletions(-) delete mode 100644 testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 344173c30918d..1e979206e16d3 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -40,8 +40,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import lombok.Getter; -import lombok.Setter; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -98,9 +96,6 @@ public static Collection getMockEnsemble() { final Queue addEntryResponseDelaysMillis = new ConcurrentLinkedQueue<>(); final List> failures = new ArrayList<>(); final List> addEntryFailures = new ArrayList<>(); - @Setter - @Getter - private volatile PulsarMockReadHandleInterceptor readHandleInterceptor; public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception { this.orderedExecutor = orderedExecutor; @@ -255,8 +250,7 @@ public CompletableFuture execute() { return FutureUtils.exception(new BKException.BKUnauthorizedAccessException()); } else { return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId, - lh.getLedgerMetadata(), lh.entries, - PulsarMockBookKeeper.this::getReadHandleInterceptor)); + lh.getLedgerMetadata(), lh.entries)); } }); } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index d30684e604670..aa61e541d0d6b 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -73,7 +73,7 @@ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, this.digest = digest; this.passwd = Arrays.copyOf(passwd, passwd.length); - readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor); + readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index 9f3f4969199ce..a4361f62254e4 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -41,36 +40,28 @@ class PulsarMockReadHandle implements ReadHandle { private final long ledgerId; private final LedgerMetadata metadata; private final List entries; - private final Supplier readHandleInterceptorSupplier; PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, - List entries, - Supplier readHandleInterceptorSupplier) { + List entries) { this.bk = bk; this.ledgerId = ledgerId; this.metadata = metadata; this.entries = entries; - this.readHandleInterceptorSupplier = readHandleInterceptorSupplier; } @Override public CompletableFuture readAsync(long firstEntry, long lastEntry) { return bk.getProgrammedFailure().thenComposeAsync((res) -> { - log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); - List seq = new ArrayList<>(); - long entryId = firstEntry; - while (entryId <= lastEntry && entryId < entries.size()) { - seq.add(entries.get((int) entryId++).duplicate()); - } - log.debug("Entries read: {}", seq); - LedgerEntriesImpl ledgerEntries = LedgerEntriesImpl.create(seq); - PulsarMockReadHandleInterceptor pulsarMockReadHandleInterceptor = readHandleInterceptorSupplier.get(); - if (pulsarMockReadHandleInterceptor != null) { - return pulsarMockReadHandleInterceptor.interceptReadAsync(ledgerId, firstEntry, lastEntry, - ledgerEntries); - } - return FutureUtils.value(ledgerEntries); - }); + log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); + List seq = new ArrayList<>(); + long entryId = firstEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + seq.add(entries.get((int) entryId++).duplicate()); + } + log.debug("Entries read: {}", seq); + + return FutureUtils.value(LedgerEntriesImpl.create(seq)); + }); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java deleted file mode 100644 index acee87b0f77f4..0000000000000 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandleInterceptor.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bookkeeper.client; - -import java.util.concurrent.CompletableFuture; -import org.apache.bookkeeper.client.api.LedgerEntries; - -/** - * Interceptor interface for intercepting read handle readAsync operations. - * This is useful for testing purposes, for example for introducing delays. - */ -public interface PulsarMockReadHandleInterceptor { - /** - * Intercepts the readAsync operation on a read handle. - * - * @param ledgerId ledger id - * @param firstEntry first entry to read - * @param lastEntry last entry to read - * @param entries entries that would be returned by the read operation - * @return CompletableFuture that will complete with the entries to return - */ - CompletableFuture interceptReadAsync(long ledgerId, long firstEntry, long lastEntry, - LedgerEntries entries); -} From 799fab9f3c48918d2284e1ea6b11d0dc48cf515b Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 10:03:37 +0200 Subject: [PATCH 15/28] Revert "Improve quiet time implementation in receiveMessages" This reverts commit e40461f85c9fd8a917797def51183485163e8bc7. --- .../apache/pulsar/broker/BrokerTestUtil.java | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index fee0ababe114d..8364cae53b223 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -39,7 +39,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Stream; @@ -240,42 +239,33 @@ public static void receiveMessages(BiFunction, Message, Boole public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, Duration quietTimeout, Stream> consumers) { - long quietTimeoutNanos = quietTimeout.toNanos(); - AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime()); FutureUtil.waitForAll(consumers - .map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, messageHandler, - lastMessageReceivedNanos)).toList()).join(); + .map(consumer -> receiveMessagesAsync(consumer, quietTimeout, messageHandler)).toList()).join(); } // asynchronously receive messages from a consumer and handle them using the provided message handler // the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads // this is useful in tests where multiple consumers are needed to test the functionality - private static CompletableFuture receiveMessagesAsync(Consumer consumer, - long quietTimeoutNanos, - BiFunction, Message, Boolean> - messageHandler, - AtomicLong lastMessageReceivedNanos) { - return consumer.receiveAsync() - .orTimeout(quietTimeoutNanos, TimeUnit.NANOSECONDS) + private static CompletableFuture receiveMessagesAsync(Consumer consumer, Duration quietTimeout, + BiFunction, Message, Boolean> + messageHandler) { + CompletableFuture> receiveFuture = consumer.receiveAsync(); + return receiveFuture + .orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS) .handle((msg, t) -> { - long currentNanos = System.nanoTime(); if (t != null) { if (t instanceof TimeoutException) { - if (currentNanos - lastMessageReceivedNanos.get() > quietTimeoutNanos) { - return false; - } else { - return true; - } + // cancel the receive future so that Pulsar client can clean up the resources + receiveFuture.cancel(false); + return false; } else { throw FutureUtil.wrapToCompletionException(t); } } - lastMessageReceivedNanos.set(currentNanos); return messageHandler.apply(consumer, msg); }).thenComposeAsync(receiveMore -> { if (receiveMore) { - return receiveMessagesAsync(consumer, quietTimeoutNanos, messageHandler, - lastMessageReceivedNanos); + return receiveMessagesAsync(consumer, quietTimeout, messageHandler); } else { return CompletableFuture.completedFuture(null); } From d8361bf4cb1bf58411db9efbe2c55cb476bf41ef Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 15:54:13 +0200 Subject: [PATCH 16/28] Fix test after reverting read handle interceptor changes --- .../api/KeySharedSubscriptionDisabledBrokerCacheTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java index cfa287d4e1f26..b6a463a1955b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java @@ -125,11 +125,6 @@ public void resetDefaultNamespace() throws Exception { defaultConf.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()); } - @AfterMethod(alwaysRun = true) - public void resetInterceptor() { - pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor(null); - } - // Use a fixed seed to make the tests using random values deterministic // When a test fails, it's possible to re-run it to reproduce the issue private static final Random random = new Random(1); From eef83e9450f92f1b77d160990191d22eee1f6e08 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 15:40:06 +0200 Subject: [PATCH 17/28] Revert "Add test and more docs for OutsideCriticalSectionsExecutor" This reverts commit fa60a7af73a00d94e2d3bd18c8810a0099bef693. --- .../OutsideCriticalSectionsExecutor.java | 19 ---- .../OutsideCriticalSectionsExecutorTest.java | 87 ------------------- 2 files changed, 106 deletions(-) delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutorTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java index c625d62f7340e..77921d1a976c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java @@ -29,21 +29,12 @@ /** * Executor that runs tasks in the current thread when * there aren't any critical sections in execution. - * If there's a critical section in execution, the tasks are queued - * and postponed until all critical sections have been exited. - * The tasks are run on the thread that exited the last critical section. */ public class OutsideCriticalSectionsExecutor implements Executor { private final AtomicInteger criticalSectionsCount = new AtomicInteger(); private final Queue queuedTasks = new ConcurrentLinkedQueue<>(); private final ReadWriteLock executionLock = new ReentrantReadWriteLock(); - /** - * Executes the given command at some time in the future. - * If there are no critical sections in execution, the command is executed immediately. - * If there are critical sections in execution, the command is queued and executed after all critical sections have - * been exited. - */ @Override public void execute(Runnable command) { executionLock.writeLock().lock(); @@ -58,9 +49,6 @@ public void execute(Runnable command) { } } - /** - * Enters a critical section. This method should be called before entering a critical section. - */ public void enterCriticalSection() { executionLock.readLock().lock(); try { @@ -70,19 +58,12 @@ public void enterCriticalSection() { } } - /** - * Exits a critical section. This method should be called after exiting a critical section. - */ public void exitCriticalSection() { if (criticalSectionsCount.decrementAndGet() == 0) { runQueuedTasks(); } } - /** - * Runs a callable which is a critical section. This method should be used when - * the result of the callable is needed and it should run as a critical section. - */ public T runCriticalSectionCallable(Callable callable) { executionLock.readLock().lock(); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutorTest.java deleted file mode 100644 index 604d885430427..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutorTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.service; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertThrows; -import static org.testng.Assert.assertTrue; -import java.util.concurrent.atomic.AtomicBoolean; -import org.testng.annotations.Test; - -public class OutsideCriticalSectionsExecutorTest { - @Test - public void executeRunsCommandImmediatelyWhenNoCriticalSections() { - OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); - AtomicBoolean executed = new AtomicBoolean(false); - - executor.execute(() -> executed.set(true)); - - assertTrue(executed.get()); - } - - @Test - public void executeQueuesCommandWhenInCriticalSection() { - OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); - AtomicBoolean executed = new AtomicBoolean(false); - - executor.enterCriticalSection(); - executor.execute(() -> executed.set(true)); - assertFalse(executed.get()); - - executor.exitCriticalSection(); - assertTrue(executed.get()); - } - - @Test - public void runCriticalSectionCallableExecutesCallable() throws Exception { - OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); - String result = executor.runCriticalSectionCallable(() -> "test"); - - assertEquals(result, "test"); - } - - @Test - public void runCriticalSectionCallableHandlesException() { - OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); - - assertThrows(RuntimeException.class, () -> { - executor.runCriticalSectionCallable(() -> { - throw new Exception("test"); - }); - }); - } - - @Test - public void exitCriticalSectionDoesNotRunTasksWhenStillInCriticalSection() { - OutsideCriticalSectionsExecutor executor = new OutsideCriticalSectionsExecutor(); - AtomicBoolean executed = new AtomicBoolean(false); - - executor.enterCriticalSection(); - executor.enterCriticalSection(); - executor.execute(() -> executed.set(true)); - assertFalse(executed.get()); - - executor.exitCriticalSection(); - assertFalse(executed.get()); - - executor.exitCriticalSection(); - assertTrue(executed.get()); - } -} \ No newline at end of file From 31db5fa158b5cce2ee45445b59c8f3ab8fb7106c Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 15:40:13 +0200 Subject: [PATCH 18/28] Revert "Postpone removals after critical sections to prevent race conditions" This reverts commit 3913e6413a3c866639bbddd460aadc60ff1d7987. --- .../broker/service/DrainingHashesTracker.java | 94 ++++++++++++------ .../OutsideCriticalSectionsExecutor.java | 97 ------------------- ...tStickyKeyDispatcherMultipleConsumers.java | 23 +---- .../service/DrainingHashesTrackerTest.java | 52 ++++++++++ 4 files changed, 117 insertions(+), 149 deletions(-) delete mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 8ccfc49ddcc8f..fff20e86e668c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.PrimitiveIterator; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; import lombok.ToString; @@ -51,9 +50,10 @@ public class DrainingHashesTracker { // optimize the memory consumption of the map by using primitive int keys private final Int2ObjectOpenHashMap drainingHashes = new Int2ObjectOpenHashMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + int batchLevel; + boolean unblockedWhileBatching; private final Map consumerDrainingHashesStatsMap = new ConcurrentHashMap<>(); - private final Executor removalExecutor; /** * Represents an entry in the draining hashes tracker. @@ -220,13 +220,8 @@ public interface UnblockingHandler { } public DrainingHashesTracker(String dispatcherName, UnblockingHandler unblockingHandler) { - this(dispatcherName, unblockingHandler, Runnable::run); - } - - public DrainingHashesTracker(String dispatcherName, UnblockingHandler unblockingHandler, Executor removalExecutor) { this.dispatcherName = dispatcherName; this.unblockingHandler = unblockingHandler; - this.removalExecutor = removalExecutor; } /** @@ -279,6 +274,39 @@ public void addEntry(Consumer consumer, int stickyHash) { } } + /** + * Start a batch operation. There could be multiple nested batch operations. + * The unblocking of sticky key hashes will be done only when the last batch operation ends. + */ + public void startBatch() { + lock.writeLock().lock(); + try { + batchLevel++; + } finally { + lock.writeLock().unlock(); + } + } + + /** + * End a batch operation. + */ + public void endBatch() { + boolean notifyUnblocking = false; + lock.writeLock().lock(); + try { + if (--batchLevel == 0 && unblockedWhileBatching) { + unblockedWhileBatching = false; + notifyUnblocking = true; + } + } finally { + lock.writeLock().unlock(); + } + // notify unblocking of the hash outside the lock + if (notifyUnblocking) { + unblockingHandler.stickyKeyHashUnblocked(-1); + } + } + /** * Reduce the reference count for a given sticky hash. * @@ -302,38 +330,40 @@ public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) { + "."); } if (entry.decrementRefCount()) { - removalExecutor.execute(() -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, - consumer.consumerId(), consumer.consumerName()); - } + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, + consumer.consumerId(), consumer.consumerName()); + } - DrainingHashEntry removed; - boolean notifyUnblocking = false; - lock.writeLock().lock(); - try { - removed = drainingHashes.remove(stickyHash); - if (removed.isBlocking()) { + DrainingHashEntry removed; + boolean notifyUnblocking = false; + lock.writeLock().lock(); + try { + removed = drainingHashes.remove(stickyHash); + if (removed.isBlocking()) { + if (batchLevel > 0) { + unblockedWhileBatching = true; + } else { notifyUnblocking = true; } - } finally { - lock.writeLock().unlock(); } + } finally { + lock.writeLock().unlock(); + } - // perform side-effects outside of the lock to reduce chances for deadlocks + // perform side-effects outside of the lock to reduce chances for deadlocks - // update the consumer specific stats - ConsumerDrainingHashesStats drainingHashesStats = - consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); - if (drainingHashesStats != null) { - drainingHashesStats.clearHash(stickyHash); - } + // update the consumer specific stats + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyHash); + } - // notify unblocking of the hash outside the lock - if (notifyUnblocking) { - unblockingHandler.stickyKeyHashUnblocked(stickyHash); - } - }); + // notify unblocking of the hash outside the lock + if (notifyUnblocking) { + unblockingHandler.stickyKeyHashUnblocked(stickyHash); + } } else { if (log.isDebugEnabled()) { log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java deleted file mode 100644 index 77921d1a976c6..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OutsideCriticalSectionsExecutor.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.service; - -import java.util.Queue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * Executor that runs tasks in the current thread when - * there aren't any critical sections in execution. - */ -public class OutsideCriticalSectionsExecutor implements Executor { - private final AtomicInteger criticalSectionsCount = new AtomicInteger(); - private final Queue queuedTasks = new ConcurrentLinkedQueue<>(); - private final ReadWriteLock executionLock = new ReentrantReadWriteLock(); - - @Override - public void execute(Runnable command) { - executionLock.writeLock().lock(); - try { - if (criticalSectionsCount.get() == 0) { - command.run(); - } else { - queuedTasks.add(command); - } - } finally { - executionLock.writeLock().unlock(); - } - } - - public void enterCriticalSection() { - executionLock.readLock().lock(); - try { - criticalSectionsCount.incrementAndGet(); - } finally { - executionLock.readLock().unlock(); - } - } - - public void exitCriticalSection() { - if (criticalSectionsCount.decrementAndGet() == 0) { - runQueuedTasks(); - } - } - - public T runCriticalSectionCallable(Callable callable) { - executionLock.readLock().lock(); - try { - criticalSectionsCount.incrementAndGet(); - return callable.call(); - } catch (Exception e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } - throw new RuntimeException(e); - } finally { - executionLock.readLock().unlock(); - exitCriticalSection(); - } - } - - private void runQueuedTasks() { - executionLock.writeLock().lock(); - try { - if (criticalSectionsCount.get() != 0) { - return; - } - Runnable command; - while ((command = queuedTasks.poll()) != null) { - command.run(); - } - } finally { - executionLock.writeLock().unlock(); - } - } -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 952106b5f5d64..1a3e2f706cba8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -48,7 +47,6 @@ import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector; import org.apache.pulsar.broker.service.ImpactedConsumersResult; -import org.apache.pulsar.broker.service.OutsideCriticalSectionsExecutor; import org.apache.pulsar.broker.service.PendingAcksMap; import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; @@ -76,8 +74,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private final DrainingHashesTracker drainingHashesTracker; private final RescheduleReadHandler rescheduleReadHandler; - private final OutsideCriticalSectionsExecutor outsideCriticalSectionsExecutor = - new OutsideCriticalSectionsExecutor(); PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { @@ -89,8 +85,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi this.drainingHashesRequired = keySharedMode == KeySharedMode.AUTO_SPLIT && !allowOutOfOrderDelivery; this.drainingHashesTracker = - drainingHashesRequired ? new DrainingHashesTracker(this.getName(), this::stickyKeyHashUnblocked, - outsideCriticalSectionsExecutor) : null; + drainingHashesRequired ? new DrainingHashesTracker(this.getName(), this::stickyKeyHashUnblocked) : null; this.rescheduleReadHandler = new RescheduleReadHandler(conf::getKeySharedUnblockingIntervalMs, topic.getBrokerService().executor(), this::cancelPendingRead, () -> reScheduleReadInMs(0), () -> havePendingRead, this::getReadMoreEntriesCallCount, () -> !redeliveryMessages.isEmpty()); @@ -154,12 +149,12 @@ public void handleRemoving(Consumer consumer, long ledgerId, long entryId, int s @Override public void startBatch() { - outsideCriticalSectionsExecutor.enterCriticalSection(); + drainingHashesTracker.startBatch(); } @Override public void endBatch() { - outsideCriticalSectionsExecutor.exitCriticalSection(); + drainingHashesTracker.endBatch(); } }); consumer.setDrainingHashesConsumerStatsUpdater(drainingHashesTracker::updateConsumerStats); @@ -211,20 +206,8 @@ protected synchronized void clearComponentsAfterRemovedAllConsumers() { } } - @Override - protected synchronized NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { - return outsideCriticalSectionsExecutor.runCriticalSectionCallable( - () -> super.getMessagesToReplayNow(maxMessagesToRead)); - } - @Override protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { - return outsideCriticalSectionsExecutor.runCriticalSectionCallable(() -> { - return internalTrySendMessagesToConsumers(readType, entries); - }); - } - - private synchronized boolean internalTrySendMessagesToConsumers(ReadType readType, List entries) { lastNumberOfEntriesProcessed = 0; long totalMessagesSent = 0; long totalBytesSent = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java index 9dcbeabc1f96b..9c409028f6d23 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.BrokerTestUtil.createMockConsumer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -108,6 +109,57 @@ public void shouldBlockStickyKeyHash_DoesNotBlockForNewEntry() { assertFalse(result); } + @Test + public void startBatch_IncrementsBatchLevel() { + DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", mock(UnblockingHandler.class)); + + tracker.startBatch(); + assertEquals(tracker.batchLevel, 1); + + tracker.startBatch(); + assertEquals(tracker.batchLevel, 2); + + tracker.startBatch(); + assertEquals(tracker.batchLevel, 3); + } + + @Test + public void endBatch_DecrementsBatchLevel() { + DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", mock(UnblockingHandler.class)); + tracker.startBatch(); + + tracker.endBatch(); + + assertEquals(tracker.batchLevel, 0); + } + + @Test + public void endBatch_InvokesUnblockingHandlerWhenUnblockedWhileBatching() { + // given a tracker with unblocking handler + UnblockingHandler unblockingHandler = mock(UnblockingHandler.class); + DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", unblockingHandler); + + // when a hash is draining + Consumer consumer1 = createMockConsumer("consumer1"); + tracker.addEntry(consumer1, 1); + // and batch starts + tracker.startBatch(); + + // when hash gets blocked + Consumer consumer2 = createMockConsumer("consumer2"); + tracker.shouldBlockStickyKeyHash(consumer2, 1); + // and it gets unblocked + tracker.reduceRefCount(consumer1, 1, false); + + // then no unblocking call should be done + verify(unblockingHandler, never()).stickyKeyHashUnblocked(anyInt()); + + // when batch ends + tracker.endBatch(); + // then unblocking call should be done + verify(unblockingHandler).stickyKeyHashUnblocked(-1); + } + @Test public void clear_RemovesAllEntries() { Consumer consumer = createMockConsumer("consumer1"); From 231b30073df554fb4d0d29f7073b1d3445f606f9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 15:54:42 +0200 Subject: [PATCH 19/28] Fix test after reverting previous changes --- .../apache/pulsar/broker/service/DrainingHashesTrackerTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java index 9c409028f6d23..3240bb745fe06 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java @@ -19,7 +19,9 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.BrokerTestUtil.createMockConsumer; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; From e734cdfc64f40a7be618f9567dccd0371ac0ff0d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 15:55:50 +0200 Subject: [PATCH 20/28] Add solution by Yubiao to prevent race condition in unblocking while filtering messages to replay --- ...rsistentStickyKeyDispatcherMultipleConsumers.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 1a3e2f706cba8..305591fc255a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -536,6 +536,9 @@ private class ReplayPositionFilter implements Predicate { // tracks the available permits for each consumer for the duration of the filter usage // the filter is stateful and shouldn't be shared or reused later private final Map availablePermitsMap = new HashMap<>(); + // tracks the hashes that have been blocked during the filtering + // it is necessary to block all later messages after a hash gets blocked so that ordering is preserved + private final Set alreadyBlockedHashes = new HashSet<>(); @Override public boolean test(Position position) { @@ -553,25 +556,34 @@ public boolean test(Position position) { } return true; } + // check if the hash is already blocked, if so, then replaying of the position should be skipped + // to preserve ordering + if (alreadyBlockedHashes.contains(stickyKeyHash)) { + return false; + } // find the consumer for the sticky key hash Consumer consumer = selector.select(stickyKeyHash.intValue()); // skip replaying the message position if there's no assigned consumer if (consumer == null) { + alreadyBlockedHashes.add(stickyKeyHash); return false; } + // lookup the available permits for the consumer MutableInt availablePermits = availablePermitsMap.computeIfAbsent(consumer, k -> new MutableInt(getAvailablePermits(consumer))); // skip replaying the message position if the consumer has no available permits if (availablePermits.intValue() <= 0) { + alreadyBlockedHashes.add(stickyKeyHash); return false; } if (drainingHashesRequired && drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash.intValue())) { // the hash is draining and the consumer is not the draining consumer + alreadyBlockedHashes.add(stickyKeyHash); return false; } From a5d202915660b6c530710c42379893fbfcea35e3 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 17:32:48 +0200 Subject: [PATCH 21/28] Add "already blocked hashes" solution to dispatching phase --- ...tStickyKeyDispatcherMultipleConsumers.java | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 305591fc255a9..8bddbde02c974 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -20,6 +20,8 @@ import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; import com.google.common.annotations.VisibleForTesting; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -407,6 +409,8 @@ private Map> filterAndGroupEntriesForDispatching(List blockedByHashConsumers = lookAheadAllowed && readType == ReadType.Normal ? new HashSet<>() : null; // in replay read mode, keep track of consumers for entries, used for look-ahead check Set consumersForEntriesForLookaheadCheck = lookAheadAllowed ? new HashSet<>() : null; + // track already blocked hashes to block any further messages with the same hash + IntSet alreadyBlockedHashes = new IntOpenHashSet(); for (Entry inputEntry : entries) { EntryAndMetadata entry; @@ -419,24 +423,29 @@ private Map> filterAndGroupEntriesForDispatching(List new MutableInt(getAvailablePermits(consumer))); - // a consumer was found for the sticky key hash and the entry can be dispatched - if (permits.intValue() > 0 - && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) { - // decrement the permits for the consumer - permits.decrement(); - // allow the entry to be dispatched - dispatchEntry = true; + // check if the hash is already blocked + boolean hashIsAlreadyBlocked = alreadyBlockedHashes.contains(stickyKeyHash); + if (!hashIsAlreadyBlocked) { + consumer = selector.select(stickyKeyHash); + if (consumer != null) { + if (lookAheadAllowed) { + consumersForEntriesForLookaheadCheck.add(consumer); + } + blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null; + MutableInt permits = + permitsForConsumer.computeIfAbsent(consumer, + k -> new MutableInt(getAvailablePermits(k))); + // a consumer was found for the sticky key hash and the entry can be dispatched + if (permits.intValue() > 0 + && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) { + // decrement the permits for the consumer + permits.decrement(); + // allow the entry to be dispatched + dispatchEntry = true; + } } } if (dispatchEntry) { @@ -445,6 +454,10 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) { entriesGroupedByConsumer.computeIfAbsent(consumer, k -> new ArrayList<>()); consumerEntries.add(entry); } else { + if (!hashIsAlreadyBlocked) { + // the hash is blocked, add it to the set of blocked hashes + alreadyBlockedHashes.add(stickyKeyHash); + } if (blockedByHash != null && blockedByHash.isTrue()) { // the entry is blocked by hash, add the consumer to the blocked set blockedByHashConsumers.add(consumer); From 450069afd2f34e81efafabaa78bcb89d58496b09 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 18:30:22 +0200 Subject: [PATCH 22/28] Run test with most classic and PIP-379 implementation --- .../api/KeySharedSubscriptionDisabledBrokerCacheTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java index b6a463a1955b4..476c288a41ed2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java @@ -57,6 +57,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; @Test(groups = "broker-impl") @@ -67,7 +68,7 @@ public class KeySharedSubscriptionDisabledBrokerCacheTest extends ProducerConsum private final KeySharedImplementationType implementationType; // Comment out the next line (Factory annotation) to run tests manually in IntelliJ, one-by-one - //@Factory + @Factory public static Object[] createTestInstances() { return KeySharedImplementationType.generateTestInstances(KeySharedSubscriptionDisabledBrokerCacheTest::new); } From 06827df01022d656345386a8b9cc9ee6424257ae Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 Jan 2025 07:19:08 +0200 Subject: [PATCH 23/28] Revert havePendingReplayRead related changes --- ...PersistentDispatcherMultipleConsumers.java | 32 ++++----------- ...ckyKeyDispatcherMultipleConsumersTest.java | 39 +------------------ 2 files changed, 9 insertions(+), 62 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 29c22642ac785..82b96c365072f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -335,13 +335,6 @@ public synchronized void readMoreEntries() { } return; } - if (havePendingReplayRead) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Skipping read for the topic, Due to replay in-progress.", topic.getName(), - getSubscriptionName()); - } - return; - } if (isSendInProgress()) { // we cannot read more entries while sending the previous batch // otherwise we could re-read the same entries and send duplicates @@ -386,23 +379,13 @@ public synchronized void readMoreEntries() { long bytesToRead = calculateResult.getRight(); if (messagesToRead == -1 || bytesToRead == -1) { - // Skip read as topic/dispatcher has exceed the dispatch rate + // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete. return; } Set messagesToReplayNow = canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet(); if (!messagesToReplayNow.isEmpty()) { - // before replaying, cancel possible pending read that is waiting for more entries - cancelPendingRead(); - if (havePendingRead) { - // skip read since a pending read is already in progress which cannot be cancelled - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Skipping replay read for the topic, Due to pending read in-progress.", - topic.getName(), getSubscriptionName()); - } - return; - } if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), consumerList.size()); @@ -632,6 +615,13 @@ protected Pair calculateToRead(int currentTotalAvailablePermits) } } + if (havePendingReplayRead) { + if (log.isDebugEnabled()) { + log.debug("[{}] Skipping replay while awaiting previous read to complete", name); + } + return Pair.of(-1, -1L); + } + // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException messagesToRead = Math.max(messagesToRead, 1); bytesToRead = Math.max(bytesToRead, 1); @@ -727,12 +717,6 @@ public SubType getType() { public final synchronized void readEntriesComplete(List entries, Object ctx) { ReadType readType = (ReadType) ctx; if (readType == ReadType.Normal) { - if (!havePendingRead) { - log.debug("Discarding read entries as there is no pending read"); - entries.forEach(Entry::release); - readMoreEntriesAsync(); - return; - } havePendingRead = false; } else { havePendingReplayRead = false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index db2c94e408a59..7234f0caefc63 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -560,11 +560,6 @@ public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInS dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - public synchronized void readMoreEntries() { - havePendingRead = true; - } - @Override protected void reScheduleReadInMs(long readAfterMs) { retryDelays.add(readAfterMs); @@ -572,11 +567,6 @@ protected void reScheduleReadInMs(long readAfterMs) { }; } else { dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { - @Override - public synchronized void readMoreEntries() { - havePendingRead = true; - } - @Override protected void reScheduleReadInMs(long readAfterMs) { retryDelays.add(readAfterMs); @@ -588,11 +578,8 @@ protected void reScheduleReadInMs(long readAfterMs) { consumerMockAvailablePermits.set(0); dispatcher.addConsumer(consumerMock); - // call "readEntriesComplete" directly to test the retry behavior List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - // call the overridden "readMoreEntries" method that sets the "havePendingRead" flag - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 1); @@ -601,7 +588,6 @@ protected void reScheduleReadInMs(long readAfterMs) { ); // test the second retry delay entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 2); @@ -612,7 +598,6 @@ protected void reScheduleReadInMs(long readAfterMs) { // verify the max retry delay for (int i = 0; i < 100; i++) { entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); } Awaitility.await().untilAsserted(() -> { @@ -624,7 +609,6 @@ protected void reScheduleReadInMs(long readAfterMs) { // unblock to check that the retry delay is reset consumerMockAvailablePermits.set(1000); entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); // wait that the possibly async handling has completed Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); @@ -632,7 +616,6 @@ protected void reScheduleReadInMs(long readAfterMs) { // now block again to check the next retry delay so verify it was reset consumerMockAvailablePermits.set(0); entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 103); @@ -660,11 +643,6 @@ public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSub dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - public synchronized void readMoreEntries() { - havePendingRead = true; - } - @Override protected void reScheduleReadInMs(long readAfterMs) { retryDelays.add(readAfterMs); @@ -672,11 +650,6 @@ protected void reScheduleReadInMs(long readAfterMs) { }; } else { dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { - @Override - public synchronized void readMoreEntries() { - havePendingRead = true; - } - @Override protected void reScheduleReadInMs(long readAfterMs) { retryDelays.add(readAfterMs); @@ -690,8 +663,6 @@ protected void reScheduleReadInMs(long readAfterMs) { // call "readEntriesComplete" directly to test the retry behavior List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - // call the overridden "readMoreEntries" method that sets the "havePendingRead" flag - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 1); @@ -700,7 +671,6 @@ protected void reScheduleReadInMs(long readAfterMs) { ); // test the second retry delay entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 2); @@ -711,7 +681,6 @@ protected void reScheduleReadInMs(long readAfterMs) { // verify the max retry delay for (int i = 0; i < 100; i++) { entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); } Awaitility.await().untilAsserted(() -> { @@ -723,7 +692,6 @@ protected void reScheduleReadInMs(long readAfterMs) { // unblock to check that the retry delay is reset consumerMockAvailablePermits.set(1000); entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); // wait that the possibly async handling has completed Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); @@ -731,7 +699,6 @@ protected void reScheduleReadInMs(long readAfterMs) { // now block again to check the next retry delay so verify it was reset consumerMockAvailablePermits.set(0); entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - dispatcher.readMoreEntries(); dispatcher.readEntriesComplete(new ArrayList<>(entries), PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 103); @@ -753,7 +720,7 @@ public void testNoBackoffDelayWhenDelayedMessages(boolean dispatchMessagesInSubs AtomicInteger reScheduleReadInMsCalled = new AtomicInteger(0); AtomicBoolean delayAllMessages = new AtomicBoolean(true); - PersistentDispatcherMultipleConsumers dispatcher; + AbstractPersistentDispatcherMultipleConsumers dispatcher; if (isKeyShared) { dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, @@ -765,7 +732,6 @@ protected void reScheduleReadInMs(long readAfterMs) { @Override public synchronized void readMoreEntries() { - havePendingRead = true; readMoreEntriesCalled.incrementAndGet(); } @@ -787,7 +753,6 @@ protected void reScheduleReadInMs(long readAfterMs) { @Override public synchronized void readMoreEntries() { - havePendingRead = true; readMoreEntriesCalled.incrementAndGet(); } @@ -815,8 +780,6 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata dispatcher.addConsumer(consumerMock); List entries = new ArrayList<>(List.of(EntryImpl.create(1, 1, createMessage("message1", 1)))); - dispatcher.readMoreEntries(); - readMoreEntriesCalled.set(0); dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(reScheduleReadInMsCalled.get(), 0, "reScheduleReadInMs should not be called"); From 98942d7d12d33602999489017db511eb6d62c24c Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 Jan 2025 07:33:38 +0200 Subject: [PATCH 24/28] Validate that no exceptions were thrown in message handler --- ...edSubscriptionDisabledBrokerCacheTest.java | 59 +++++++++++-------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java index 476c288a41ed2..301478001ba3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java @@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.SoftAssertions.assertSoftly; import static org.testng.Assert.fail; import java.time.Duration; import java.util.ArrayList; @@ -189,33 +190,42 @@ public void testMessageOrderInSingleConsumerReconnect(KeySharedImplementationTyp AtomicBoolean c2MessagesShouldBeUnacked = new AtomicBoolean(true); Set keysForC2 = new HashSet<>(); AtomicLong lastMessageTimestamp = new AtomicLong(System.currentTimeMillis()); + List exceptionsInHandler = Collections.synchronizedList(new ArrayList<>()); Map> keyPositions = new HashMap<>(); MessageListener messageHandler = (consumer, msg) -> { lastMessageTimestamp.set(System.currentTimeMillis()); synchronized (this) { - String key = msg.getKey(); - if (c2MessagesShouldBeUnacked.get() && keysForC2.contains(key)) { - unackedMessages.add(Pair.of(consumer, msg)); - return; - } - long delayMillis = ThreadLocalRandom.current().nextLong(25, 50); - CompletableFuture.delayedExecutor(delayMillis, TimeUnit.MILLISECONDS).execute(() -> - consumer.acknowledgeAsync(msg)); - MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId(); - Position currentPosition = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); - Pair prevPair = keyPositions.get(key); - if (prevPair != null && prevPair.getLeft().compareTo(currentPosition) > 0) { - boolean isDuplicate = !remainingMessageValues.contains(msg.getValue()); - log.error("key: {} value: {} prev: {}/{} current: {}/{} duplicate: {}", key, msg.getValue(), prevPair.getLeft(), - prevPair.getRight(), currentPosition, consumer.getConsumerName(), isDuplicate); - fail("out of order"); - } - keyPositions.put(key, Pair.of(currentPosition, consumer.getConsumerName())); - boolean removed = remainingMessageValues.remove(msg.getValue()); - if (!removed) { - // duplicates are possible during reconnects, this is not an error - log.warn("Duplicate message: {} value: {}", msg.getMessageId(), msg.getValue()); + try { + String key = msg.getKey(); + if (c2MessagesShouldBeUnacked.get() && keysForC2.contains(key)) { + unackedMessages.add(Pair.of(consumer, msg)); + return; + } + long delayMillis = ThreadLocalRandom.current().nextLong(25, 50); + CompletableFuture.delayedExecutor(delayMillis, TimeUnit.MILLISECONDS).execute(() -> + consumer.acknowledgeAsync(msg)); + MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId(); + Position currentPosition = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); + Pair prevPair = keyPositions.get(key); + if (prevPair != null && prevPair.getLeft().compareTo(currentPosition) > 0) { + boolean isDuplicate = !remainingMessageValues.contains(msg.getValue()); + log.error("key: {} value: {} prev: {}/{} current: {}/{} duplicate: {}", key, msg.getValue(), + prevPair.getLeft(), + prevPair.getRight(), currentPosition, consumer.getConsumerName(), isDuplicate); + fail("out of order"); + } + keyPositions.put(key, Pair.of(currentPosition, consumer.getConsumerName())); + boolean removed = remainingMessageValues.remove(msg.getValue()); + if (!removed) { + // duplicates are possible during reconnects, this is not an error + log.warn("Duplicate message: {} value: {}", msg.getMessageId(), msg.getValue()); + } + } catch (Throwable t) { + exceptionsInHandler.add(t); + if (!(t instanceof AssertionError)) { + log.error("Error in message handler", t); + } } } }; @@ -320,7 +330,10 @@ public void testMessageOrderInSingleConsumerReconnect(KeySharedImplementationTyp }); try { - assertThat(remainingMessageValues).isEmpty(); + assertSoftly(softly -> { + softly.assertThat(remainingMessageValues).as("remainingMessageValues").isEmpty(); + softly.assertThat(exceptionsInHandler).as("exceptionsInHandler").isEmpty(); + }); } finally { logTopicStats(topic); } From 65401dc9b6e0772c97f89c9ac1e66dd9b1f4e605 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 Jan 2025 07:37:03 +0200 Subject: [PATCH 25/28] Make failing configurable when out-of-order message is a duplicate --- .../api/KeySharedSubscriptionDisabledBrokerCacheTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java index 301478001ba3c..cf9c893a10ca4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java @@ -163,6 +163,9 @@ public void testMessageOrderInSingleConsumerReconnect(KeySharedImplementationTyp String topic = newUniqueName("testMessageOrderInSingleConsumerReconnect"); int numberOfKeys = 100; long pauseTime = 100L; + // don't fail if duplicates are out-of-order + // it's possible to change this setting while experimenting + boolean failOnDuplicatesOutOfOrder = false; @Cleanup PulsarClient pulsarClient2 = PulsarClient.builder() @@ -213,7 +216,9 @@ public void testMessageOrderInSingleConsumerReconnect(KeySharedImplementationTyp log.error("key: {} value: {} prev: {}/{} current: {}/{} duplicate: {}", key, msg.getValue(), prevPair.getLeft(), prevPair.getRight(), currentPosition, consumer.getConsumerName(), isDuplicate); - fail("out of order"); + if (!isDuplicate || failOnDuplicatesOutOfOrder) { + fail("out of order"); + } } keyPositions.put(key, Pair.of(currentPosition, consumer.getConsumerName())); boolean removed = remainingMessageValues.remove(msg.getValue()); From ddbe35b881aff8e1c40a5096febfefef011625e1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 Jan 2025 07:57:41 +0200 Subject: [PATCH 26/28] Improve test logging --- .../KeySharedSubscriptionDisabledBrokerCacheTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java index cf9c893a10ca4..616b4b066e6eb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java @@ -213,11 +213,14 @@ public void testMessageOrderInSingleConsumerReconnect(KeySharedImplementationTyp Pair prevPair = keyPositions.get(key); if (prevPair != null && prevPair.getLeft().compareTo(currentPosition) > 0) { boolean isDuplicate = !remainingMessageValues.contains(msg.getValue()); - log.error("key: {} value: {} prev: {}/{} current: {}/{} duplicate: {}", key, msg.getValue(), - prevPair.getLeft(), - prevPair.getRight(), currentPosition, consumer.getConsumerName(), isDuplicate); + String errorMessage = String.format( + "out of order: key: %s value: %s prev: %s/%s current: %s/%s duplicate: %s", + key, msg.getValue(), + prevPair.getLeft(), prevPair.getRight(), + currentPosition, consumer.getConsumerName(), isDuplicate); + log.error(errorMessage); if (!isDuplicate || failOnDuplicatesOutOfOrder) { - fail("out of order"); + fail(errorMessage); } } keyPositions.put(key, Pair.of(currentPosition, consumer.getConsumerName())); From 4b0f08b40272c44b83a6b39ff7dffcc7bf7cfd05 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 Jan 2025 08:19:36 +0200 Subject: [PATCH 27/28] Remove unused import --- .../client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java index 616b4b066e6eb..45f776b41e78a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionDisabledBrokerCacheTest.java @@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.SoftAssertions.assertSoftly; import static org.testng.Assert.fail; import java.time.Duration; From be9b80c6e1b52d4dd675ea6bfca4a1e95bada536 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 Jan 2025 08:45:15 +0200 Subject: [PATCH 28/28] Revert unrelated DrainingHashesTracker changes --- .../broker/service/DrainingHashesTracker.java | 2 +- .../service/DrainingHashesTrackerTest.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index fff20e86e668c..9bc5c5f1e44ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -340,7 +340,7 @@ public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) { lock.writeLock().lock(); try { removed = drainingHashes.remove(stickyHash); - if (removed.isBlocking()) { + if (!closing && removed.isBlocking()) { if (batchLevel > 0) { unblockedWhileBatching = true; } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java index 3240bb745fe06..ecb20beeb648a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DrainingHashesTrackerTest.java @@ -191,4 +191,23 @@ public void unblockingHandler_InvokesStickyKeyHashUnblocked() { // then unblocking call should be done verify(unblockingHandler).stickyKeyHashUnblocked(1); } + + @Test + public void unblockingHandler_DoesNotInvokeStickyKeyHashUnblockedWhenClosing() { + // given a tracker with unblocking handler + UnblockingHandler unblockingHandler = mock(UnblockingHandler.class); + DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", unblockingHandler); + + // when a hash is draining + Consumer consumer = createMockConsumer("consumer1"); + tracker.addEntry(consumer, 1); + // aand hash gets blocked + Consumer consumer2 = createMockConsumer("consumer2"); + tracker.shouldBlockStickyKeyHash(consumer2, 1); + // and hash gets unblocked + tracker.reduceRefCount(consumer, 1, true); + + // then unblocking call should be done + verify(unblockingHandler, never()).stickyKeyHashUnblocked(anyInt()); + } } \ No newline at end of file