Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jan 23, 2025
1 parent d673504 commit 8e0947d
Show file tree
Hide file tree
Showing 3 changed files with 312 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
// If there is a delayed "cursor.rewind" after the pending read, the consumers that will be
// added before the "cursor.rewind" will have a same "recent joined position", which is the
// same as "mark deleted position +1", so we can skip this adding.
&& !shouldRewindBeforeReadingOrReplaying) {
&& !shouldRewindBeforeReading) {
recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
sortRecentlyJoinedConsumersIfNeeded();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,47 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumersClassic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.awaitility.Awaitility;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -78,6 +89,304 @@ protected void doInitConf() throws Exception {
this.conf.setSubscriptionKeySharedUseClassicPersistentImplementation(true);
}

@Test(timeOut = 180 * 1000)
public void testNormalReadAfterPendingReplay() throws Exception {
// The message with "k1" will be sent to consumer1.
String k1 = "11";
// The message with "k2" will be sent to consumer2.
String k2 = "12";
// The message with "k3" will be sent to consumer3, and will be sent to consumer2 if consumer3 is offline.
String k3 = "3";

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subName = "my-sub";
final DefaultThreadFactory threadFactory =
new DefaultThreadFactory(BrokerTestUtil.newUniqueName("thread"));
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, subName, MessageId.earliest);
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();

// Make a scenario:
// - consumer1: stuck
// - consumer2: acked all messages that it received.
ConsumerImpl<Integer> consumer1 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32) // 1
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(1)
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("c1")
.subscribe();
ConsumerImpl<Integer> consumer2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32) // 4
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("c2")
.subscribe();
AtomicInteger msgGeneratorForK1 = new AtomicInteger(0);
AtomicInteger msgGeneratorForK2 = new AtomicInteger(10000);
producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send();
for (int i = 0; i < 20; i++) {
producer.newMessage().key(k2).value(msgGeneratorForK2.incrementAndGet()).send();
}
Awaitility.await().untilAsserted(() -> {
log.info("c1 queue size: {}", consumer1.getTotalIncomingMessages());
log.info("c2 queue size: {}", consumer2.getTotalIncomingMessages());
assertTrue(1 <= consumer1.getTotalIncomingMessages());
assertTrue(1 <= consumer2.getTotalIncomingMessages());
});
ReceivedMessages<Integer> receivedMessages1 = ackAllMessages(consumer2);
assertEquals(receivedMessages1.getMessagesReceived().size(), 20);
final PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
final PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
final PersistentStickyKeyDispatcherMultipleConsumersClassic dispatcher =
(PersistentStickyKeyDispatcherMultipleConsumersClassic) persistentSubscription.getDispatcher();
// Verify: consumer2 is waiting for new messages.
Awaitility.await().untilAsserted(() -> {
assertTrue(dispatcher.isHavePendingRead());
});

// Trigger a replay reading through close consumer1
// - Inject a delay for the next replay read.
LedgerHandle firstLedger = ml.currentLedger;
LedgerHandle spyFirstLedger = spy(firstLedger);
CountDownLatch replyReadSignal = new CountDownLatch(1);
AtomicBoolean replayReadWasTriggered = new AtomicBoolean();
Answer answer = invocation -> {
long firstEntry = (long) invocation.getArguments()[0];
log.info("replay reading: {}", firstEntry);
if (firstEntry == 0) {
replayReadWasTriggered.set(true);
final CompletableFuture res = new CompletableFuture<>();
threadFactory.newThread(() -> {
try {
replyReadSignal.await();
CompletableFuture<LedgerEntries> future =
(CompletableFuture<LedgerEntries>) invocation.callRealMethod();
future.thenAccept(v -> {
res.complete(v);
}).exceptionally(ex -> {
res.completeExceptionally(ex);
return null;
});
} catch (Throwable ex) {
res.completeExceptionally(ex);
}
}).start();
return res;
} else {
return invocation.callRealMethod();
}
};
doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong());
doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), anyLong());
ml.currentLedger = spyFirstLedger;
consumer1.close();
Awaitility.await().until(() -> replayReadWasTriggered.get());

// Verify: the next normal reading will be skipped because of there is a pending replay read.
for (int i = 0; i < 20; i++) {
producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send();
}
ReceivedMessages<Integer> receivedMessages2 = ackAllMessages(consumer2);
assertEquals(receivedMessages2.getMessagesReceived().size(), 0);

// Verify: all messages with the key "k1" will be received in order.
replyReadSignal.countDown();
Thread.sleep(1000 * 3);
ReceivedMessages<Integer> receivedMessages3 = ackAllMessages(consumer2);
List<Integer> messagesReceived =
receivedMessages3.getMessagesReceived().stream().map(e -> e.getRight()).collect(Collectors.toList());
assertEquals(messagesReceived.size(), 21);
List<Integer> messagesReceivedSorted = new ArrayList<>(messagesReceived);
Collections.sort(messagesReceivedSorted);
assertEquals(messagesReceivedSorted, messagesReceivedSorted);

// Cleanup.
producer.close();
consumer2.close();
admin.topics().delete(topic, false);
}

@DataProvider
public Object[][] testCasesAfterClosedAllConsumers() {
return new Object[][] {
{"testCancelPendingReadWillNotCancelReplay"},
{"testRewindImmediately"}
};
}

@Test(timeOut = 180 * 1000, dataProvider = "testCasesAfterClosedAllConsumers")
public void testCancelPendingReadWillNotCancelReplay(String testCase) throws Exception {
// The message with "k1" will be sent to consumer1.
String k1 = "11";
// The message with "k2" will be sent to consumer2.
String k2 = "12";
// The message with "k3" will be sent to consumer3, and will be sent to consumer2 if consumer3 is offline.
String k3 = "3";

final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subName = "my-sub";
final DefaultThreadFactory threadFactory =
new DefaultThreadFactory(BrokerTestUtil.newUniqueName("thread"));
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, subName, MessageId.earliest);
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();

// Make a scenario:
// - consumer1: stuck
// - consumer2: acked all messages that it received.
List<Integer> messagesThatWillBeAckedAtLast = new ArrayList<>();
ConsumerImpl<Integer> consumer1 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32) // 1
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(1)
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("c1")
.subscribe();
ConsumerImpl<Integer> consumer2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32) // 4
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("c2")
.subscribe();
AtomicInteger msgGeneratorForK1 = new AtomicInteger(0);
AtomicInteger msgGeneratorForK2 = new AtomicInteger(1000);
AtomicInteger msgGeneratorForK3 = new AtomicInteger(1000_000);
producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send();
messagesThatWillBeAckedAtLast.add(msgGeneratorForK1.get());
for (int i = 0; i < 20; i++) {
producer.newMessage().key(k2).value(msgGeneratorForK2.incrementAndGet()).send();
}
Awaitility.await().untilAsserted(() -> {
log.info("c1 queue size: {}", consumer1.getTotalIncomingMessages());
log.info("c2 queue size: {}", consumer2.getTotalIncomingMessages());
assertTrue(1 <= consumer1.getTotalIncomingMessages());
assertTrue(1 <= consumer2.getTotalIncomingMessages());
});
ReceivedMessages<Integer> receivedMessages1 = ackAllMessages(consumer2);
assertEquals(receivedMessages1.getMessagesReceived().size(), 20);
final PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
final PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
final PersistentStickyKeyDispatcherMultipleConsumersClassic dispatcher =
(PersistentStickyKeyDispatcherMultipleConsumersClassic) persistentSubscription.getDispatcher();
// Verify: consumer2 is waiting for new messages.
Awaitility.await().untilAsserted(() -> {
assertTrue(dispatcher.isHavePendingRead());
});
// Complete the pending read.
for (int i = 0; i < 20; i++) {
producer.newMessage().key(k3).value(msgGeneratorForK3.incrementAndGet()).send();
messagesThatWillBeAckedAtLast.add(msgGeneratorForK3.get());
}
Awaitility.await().untilAsserted(() -> {
assertFalse(dispatcher.isHavePendingRead());
});

// Trigger a replay reading through close consumer1
// - Inject a delay for the next replay read.
LedgerHandle firstLedger = ml.currentLedger;
LedgerHandle spyFirstLedger = spy(firstLedger);
CountDownLatch replyReadSignal = new CountDownLatch(1);
AtomicBoolean replayReadWasTriggered = new AtomicBoolean();
Answer answer = invocation -> {
long firstEntry = (long) invocation.getArguments()[0];
log.info("replay reading: {}", firstEntry);
if (firstEntry == 0) {
replayReadWasTriggered.set(true);
final CompletableFuture res = new CompletableFuture<>();
threadFactory.newThread(() -> {
try {
replyReadSignal.await();
CompletableFuture<LedgerEntries> future =
(CompletableFuture<LedgerEntries>) invocation.callRealMethod();
future.thenAccept(v -> {
res.complete(v);
}).exceptionally(ex -> {
res.completeExceptionally(ex);
return null;
});
} catch (Throwable ex) {
res.completeExceptionally(ex);
}
}).start();
return res;
} else {
return invocation.callRealMethod();
}
};
doAnswer(answer).when(spyFirstLedger).readAsync(anyLong(), anyLong());
doAnswer(answer).when(spyFirstLedger).readUnconfirmedAsync(anyLong(), anyLong());

// Trigger a "cancelPendingRead", but the operation will do nothing because of there is no pending read.
// - after removing all consumers and adding a new consumer, broker will call a "cursor.cancelPendingRead"
// and "cursor.rewind".
consumer1.close();
consumer2.close();
int queueSize3 = "testRewindImmediately".equals(testCase) ? 0 : 100;
ConsumerImpl<Integer> consumer3 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32) // 4
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(queueSize3)
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("c3")
.subscribe();

// Verify 1: "cursor.cancelPendingReadRequest" will not cancel the pending replay read.
// This motivation of this verify is here: https://github.com/apache/pulsar/pull/23855#issuecomment-2597522865.
if ("testCancelPendingReadWillNotCancelReplay".equals(testCase)) {
Awaitility.await().untilAsserted(() -> {
assertTrue(dispatcher.isHavePendingRead());
});
}
// Verify 2: "cursor.rewind" will be called immediately, without wait for the next replay read.
if ("testRewindImmediately".equals(testCase)) {
Awaitility.await().untilAsserted(() -> {
log.info("cursor rd-pos: {}, md-pos: {}", dispatcher.getCursor().getReadPosition(),
dispatcher.getCursor()
.getMarkDeletedPosition());
assertEquals(dispatcher.getCursor().getReadPosition(), ml.getNextValidPosition(dispatcher.getCursor()
.getMarkDeletedPosition()));
});
}

// Verify 3: all messages(including "replay red" and "normal read") will be received in order.
for (int i = 0; i < 20; i++) {
producer.newMessage().key(k1).value(msgGeneratorForK1.incrementAndGet()).send();
messagesThatWillBeAckedAtLast.add(msgGeneratorForK1.get());
}
replyReadSignal.countDown();
if (queueSize3 > 2) {
Awaitility.await().untilAsserted(() -> {
assertTrue(consumer3.numMessagesInQueue() > 2);
});
}
Thread.sleep(1000 * 3);
List<Integer> messagesReceived = new ArrayList<>();
while (true) {
if (messagesReceived.size() < messagesThatWillBeAckedAtLast.size()) {
Message<Integer> msg = consumer3.receive();
messagesReceived.add(msg.getValue());
consumer3.acknowledge(msg);
} else {
break;
}
}
assertEquals(messagesReceived, messagesThatWillBeAckedAtLast);
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getCursor().getMarkDeletedPosition(), ml.getLastConfirmedEntry());
});

// cleanup.
producer.close();
consumer3.close();
admin.topics().delete(topic, false);
}

@Test(timeOut = 180 * 1000, invocationCount = 1)
public void testRecentJoinQueueIsInOrderAfterRewind() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Random;
import java.util.Set;
import java.util.function.BiFunction;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -97,6 +98,7 @@ protected <T> ReceivedMessages<T> ackAllMessages(Consumer<T>...consumers) throws

protected static class ReceivedMessages<T> {

@Getter
List<Pair<MessageId, T>> messagesReceived = Collections.synchronizedList(new ArrayList<>());

List<Pair<MessageId, T>> messagesAcked = Collections.synchronizedList(new ArrayList<>());
Expand Down

0 comments on commit 8e0947d

Please sign in to comment.