Skip to content

Commit

Permalink
[fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, 3r…
Browse files Browse the repository at this point in the history
…d attempt (#20429)
  • Loading branch information
lhotari authored May 29, 2023
1 parent bdd1bf1 commit a26cf3e
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
Expand Down Expand Up @@ -821,20 +822,23 @@ public void testMsgDropStat() throws Exception {

int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection();
try {
final String topicName = "non-persistent://my-property/my-ns/stats-topic";
final String topicName = BrokerTestUtil.newUniqueName("non-persistent://my-property/my-ns/stats-topic");
// restart broker with lower publish rate limit
conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
stopBroker();
startBroker();

pulsar.getBrokerService().updateRates();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
.receiverQueueSize(1).subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();

@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
Expand All @@ -861,12 +865,12 @@ public void testMsgDropStat() throws Exception {
});
});
}
latch.await(5, TimeUnit.SECONDS);
assertTrue(latch.await(5, TimeUnit.SECONDS));

NonPersistentTopic topic =
(NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

Awaitility.await().untilAsserted(() -> {
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
pulsar.getBrokerService().updateRates();
NonPersistentTopicStats stats = topic.getStats(false, false, false);
NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
Expand All @@ -877,9 +881,6 @@ public void testMsgDropStat() throws Exception {
assertTrue(sub2Stats.getMsgDropRate() > 0);
});

producer.close();
consumer.close();
consumer2.close();
} finally {
conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate);
}
Expand Down

0 comments on commit a26cf3e

Please sign in to comment.