Skip to content

Commit

Permalink
[fix][broker] Fix heartbeat namespace create transaction internal top…
Browse files Browse the repository at this point in the history
…ic (#21348)
  • Loading branch information
TakaHiR07 authored Oct 19, 2023
1 parent 5af821d commit c8a2f49
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS

TopicName topicName = TopicName.get(topic);
if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(topicName)) {
&& !isEventSystemTopic(topicName)
&& !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected void setup() throws Exception {
conf.setDefaultNumPartitions(PARTITIONS);
conf.setManagedLedgerMaxEntriesPerLedger(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setTransactionCoordinatorEnabled(true);

super.baseSetup();
}
Expand Down Expand Up @@ -207,6 +208,24 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception {
});
}

@Test
public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(),
pulsar.getConfig());
TopicName topicName = TopicName.get("persistent",
namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
Optional<Topic> optionalTopic = pulsar.getBrokerService()
.getTopic(topicName.getPartition(1).toString(), false).join();
Assert.assertTrue(optionalTopic.isEmpty());

List<String> topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
Assert.assertEquals(topics.size(), 1);
TopicName heartbeatTopicName = TopicName.get("persistent",
namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX);
Assert.assertEquals(topics.get(0), heartbeatTopicName.toString());
}

@Test
public void testSetBacklogCausedCreatingProducerFailure() throws Exception {
final String ns = "prop/ns-test";
Expand Down

0 comments on commit c8a2f49

Please sign in to comment.