diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dfeb03a254698..c55ee60c3089c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -310,7 +310,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS TopicName topicName = TopicName.get(topic); if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() - && !isEventSystemTopic(topicName)) { + && !isEventSystemTopic(topicName) + && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 4af0bd9052391..42d941e616809 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -81,6 +81,7 @@ protected void setup() throws Exception { conf.setDefaultNumPartitions(PARTITIONS); conf.setManagedLedgerMaxEntriesPerLedger(1); conf.setBrokerDeleteInactiveTopicsEnabled(false); + conf.setTransactionCoordinatorEnabled(true); super.baseSetup(); } @@ -207,6 +208,24 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { }); } + @Test + public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), + pulsar.getConfig()); + TopicName topicName = TopicName.get("persistent", + namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + Optional optionalTopic = pulsar.getBrokerService() + .getTopic(topicName.getPartition(1).toString(), false).join(); + Assert.assertTrue(optionalTopic.isEmpty()); + + List topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + Assert.assertEquals(topics.size(), 1); + TopicName heartbeatTopicName = TopicName.get("persistent", + namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + Assert.assertEquals(topics.get(0), heartbeatTopicName.toString()); + } + @Test public void testSetBacklogCausedCreatingProducerFailure() throws Exception { final String ns = "prop/ns-test";