From 3cbf13d53d65d553523cb40ea0d1bc5eadc2ab54 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 23 Jan 2025 17:22:32 +0800 Subject: [PATCH] fix issue --- ...entDispatcherMultipleConsumersClassic.java | 1 + ...ntryCacheKeySharedSubscriptionV30Test.java | 41 +++++++++++-------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 22b82c51885ae..f69bd4c945ea8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -639,6 +639,7 @@ public final synchronized void readEntriesComplete(List entries, Object c havePendingReplayRead = false; if (shouldSkipNextReplaying && readType == ReadType.Replay) { shouldSkipNextReplaying = false; + readMoreEntriesAsync(); return; } } diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java index ae201b4ef1914..a4823f923bda7 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.java @@ -214,12 +214,13 @@ public void testNormalReadAfterPendingReplay() throws Exception { public Object[][] testCasesAfterClosedAllConsumers() { return new Object[][] { {"testCancelPendingReadWillNotCancelReplay"}, - {"testRewindImmediately"} + {"testRewindImmediately"}, + {"testRepeatedDelivery"} }; } @Test(timeOut = 180 * 1000, dataProvider = "testCasesAfterClosedAllConsumers") - public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exception { + public void testMixedReplayReadingAndNormalReading(String testCase) throws Exception { // The message with "k1" will be sent to consumer1. String k1 = "11"; // The message with "k2" will be sent to consumer2. @@ -279,14 +280,6 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc Awaitility.await().untilAsserted(() -> { assertTrue(dispatcher.isHavePendingRead()); }); - // Complete the pending read. - for (int i = 0; i < 20; i++) { - producer.newMessage().key(k3).value(msgGeneratorForK3.incrementAndGet()).send(); - messagesThatWillBeAckedAtLast.add(msgGeneratorForK3.get()); - } - Awaitility.await().untilAsserted(() -> { - assertFalse(dispatcher.isHavePendingRead()); - }); // Trigger a replay reading through close consumer1 // - Inject a delay for the next replay read. @@ -322,13 +315,28 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc }; doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong()); doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), anyLong()); + ml.currentLedger = spyFirstLedger; // Trigger a "cancelPendingRead", but the operation will do nothing because of there is no pending read. // - after removing all consumers and adding a new consumer, broker will call a "cursor.cancelPendingRead" // and "cursor.rewind". consumer1.close(); + Awaitility.await().untilAsserted(() -> { + assertTrue(dispatcher.isHavePendingReplayRead()); + }); + // Complete the pending read. + for (int i = 0; i < 20; i++) { + producer.newMessage().key(k3).value(msgGeneratorForK3.incrementAndGet()).send(); + messagesThatWillBeAckedAtLast.add(msgGeneratorForK3.get()); + } + Awaitility.await().untilAsserted(() -> { + assertFalse(dispatcher.isHavePendingRead()); + }); consumer2.close(); - int queueSize3 = "testRewindImmediately".equals(testCase) ? 0 : 100; + int queueSize3 = 1000; + if ("testRewindImmediately".equals(testCase)) { + queueSize3 = 0; + } ConsumerImpl consumer3 = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) // 4 .topic(topic) .subscriptionName(subName) @@ -341,7 +349,7 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc // This motivation of this verify is here: https://github.com/apache/pulsar/pull/23855#issuecomment-2597522865. if ("testCancelPendingReadWillNotCancelReplay".equals(testCase)) { Awaitility.await().untilAsserted(() -> { - assertTrue(dispatcher.isHavePendingRead()); + assertTrue(dispatcher.isHavePendingReplayRead()); }); } // Verify 2: "cursor.rewind" will be called immediately, without wait for the next replay read. @@ -356,23 +364,24 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc } // Verify 3: all messages(including "replay red" and "normal read") will be received in order. + // This check is then main purpose of the PR https://github.com/apache/pulsar/pull/23803. for (int i = 0; i < 20; i++) { producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send(); messagesThatWillBeAckedAtLast.add(msgGeneratorForK1.get()); } replyReadSignal.countDown(); - if (queueSize3 > 2) { + if ("testRepeatedDelivery".equals(testCase)) { Awaitility.await().untilAsserted(() -> { - assertTrue(consumer3.numMessagesInQueue() > 2); + assertTrue(consumer3.numMessagesInQueue() >= messagesThatWillBeAckedAtLast.size()); }); } - Thread.sleep(1000 * 3); List messagesReceived = new ArrayList<>(); while (true) { + log.info("received msg count: {}", messagesReceived.size()); if (messagesReceived.size() < messagesThatWillBeAckedAtLast.size()) { Message msg = consumer3.receive(); messagesReceived.add(msg.getValue()); - consumer3.acknowledge(msg); + consumer3.acknowledgeAsync(msg); } else { break; }