Skip to content

Commit

Permalink
fix issue
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jan 23, 2025
1 parent 8e0947d commit 3cbf13d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
havePendingReplayRead = false;
if (shouldSkipNextReplaying && readType == ReadType.Replay) {
shouldSkipNextReplaying = false;
readMoreEntriesAsync();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,13 @@ public void testNormalReadAfterPendingReplay() throws Exception {
public Object[][] testCasesAfterClosedAllConsumers() {
return new Object[][] {
{"testCancelPendingReadWillNotCancelReplay"},
{"testRewindImmediately"}
{"testRewindImmediately"},
{"testRepeatedDelivery"}
};
}

@Test(timeOut = 180 * 1000, dataProvider = "testCasesAfterClosedAllConsumers")
public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exception {
public void testMixedReplayReadingAndNormalReading(String testCase) throws Exception {
// The message with "k1" will be sent to consumer1.
String k1 = "11";
// The message with "k2" will be sent to consumer2.
Expand Down Expand Up @@ -279,14 +280,6 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc
Awaitility.await().untilAsserted(() -> {
assertTrue(dispatcher.isHavePendingRead());
});
// Complete the pending read.
for (int i = 0; i < 20; i++) {
producer.newMessage().key(k3).value(msgGeneratorForK3.incrementAndGet()).send();
messagesThatWillBeAckedAtLast.add(msgGeneratorForK3.get());
}
Awaitility.await().untilAsserted(() -> {
assertFalse(dispatcher.isHavePendingRead());
});

// Trigger a replay reading through close consumer1
// - Inject a delay for the next replay read.
Expand Down Expand Up @@ -322,13 +315,28 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc
};
doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong());
doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), anyLong());
ml.currentLedger = spyFirstLedger;

// Trigger a "cancelPendingRead", but the operation will do nothing because of there is no pending read.
// - after removing all consumers and adding a new consumer, broker will call a "cursor.cancelPendingRead"
// and "cursor.rewind".
consumer1.close();
Awaitility.await().untilAsserted(() -> {
assertTrue(dispatcher.isHavePendingReplayRead());
});
// Complete the pending read.
for (int i = 0; i < 20; i++) {
producer.newMessage().key(k3).value(msgGeneratorForK3.incrementAndGet()).send();
messagesThatWillBeAckedAtLast.add(msgGeneratorForK3.get());
}
Awaitility.await().untilAsserted(() -> {
assertFalse(dispatcher.isHavePendingRead());
});
consumer2.close();
int queueSize3 = "testRewindImmediately".equals(testCase) ? 0 : 100;
int queueSize3 = 1000;
if ("testRewindImmediately".equals(testCase)) {
queueSize3 = 0;
}
ConsumerImpl<Integer> consumer3 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32) // 4
.topic(topic)
.subscriptionName(subName)
Expand All @@ -341,7 +349,7 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc
// This motivation of this verify is here: https://github.com/apache/pulsar/pull/23855#issuecomment-2597522865.
if ("testCancelPendingReadWillNotCancelReplay".equals(testCase)) {
Awaitility.await().untilAsserted(() -> {
assertTrue(dispatcher.isHavePendingRead());
assertTrue(dispatcher.isHavePendingReplayRead());
});
}
// Verify 2: "cursor.rewind" will be called immediately, without wait for the next replay read.
Expand All @@ -356,23 +364,24 @@ public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exc
}

// Verify 3: all messages(including "replay red" and "normal read") will be received in order.
// This check is then main purpose of the PR https://github.com/apache/pulsar/pull/23803.
for (int i = 0; i < 20; i++) {
producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send();
messagesThatWillBeAckedAtLast.add(msgGeneratorForK1.get());
}
replyReadSignal.countDown();
if (queueSize3 > 2) {
if ("testRepeatedDelivery".equals(testCase)) {
Awaitility.await().untilAsserted(() -> {
assertTrue(consumer3.numMessagesInQueue() > 2);
assertTrue(consumer3.numMessagesInQueue() >= messagesThatWillBeAckedAtLast.size());
});
}
Thread.sleep(1000 * 3);
List<Integer> messagesReceived = new ArrayList<>();
while (true) {
log.info("received msg count: {}", messagesReceived.size());
if (messagesReceived.size() < messagesThatWillBeAckedAtLast.size()) {
Message<Integer> msg = consumer3.receive();
messagesReceived.add(msg.getValue());
consumer3.acknowledge(msg);
consumer3.acknowledgeAsync(msg);
} else {
break;
}
Expand Down

0 comments on commit 3cbf13d

Please sign in to comment.