From 136e2b6546520687cdf9b78de352cd8591f6e002 Mon Sep 17 00:00:00 2001 From: ken <1647023764@qq.com> Date: Thu, 19 Oct 2023 10:18:12 +0800 Subject: [PATCH] [fix][broker] Fix heartbeat namespace create transaction internal topic (#21348) (cherry picked from commit c8a2f49c6c6edaf6b5667f9ac7df65b815aefe58) --- .../service/persistent/PersistentTopic.java | 3 ++- .../systopic/PartitionedSystemTopicTest.java | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4a9d0069c9e0d..f5c0ecad64dd6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -295,7 +295,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS checkReplicatedSubscriptionControllerState(); TopicName topicName = TopicName.get(topic); if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() - && !isEventSystemTopic(topicName)) { + && !isEventSystemTopic(topicName) + && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 3cd25d3b00c67..f593edc20956e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -76,6 +76,7 @@ protected void setup() throws Exception { conf.setDefaultNumPartitions(PARTITIONS); conf.setManagedLedgerMaxEntriesPerLedger(1); conf.setBrokerDeleteInactiveTopicsEnabled(false); + conf.setTransactionCoordinatorEnabled(true); super.baseSetup(); } @@ -202,6 +203,24 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { }); } + @Test + public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), + pulsar.getConfig()); + TopicName topicName = TopicName.get("persistent", + namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + Optional optionalTopic = pulsar.getBrokerService() + .getTopic(topicName.getPartition(1).toString(), false).join(); + Assert.assertTrue(optionalTopic.isEmpty()); + + List topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + Assert.assertEquals(topics.size(), 1); + TopicName heartbeatTopicName = TopicName.get("persistent", + namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + Assert.assertEquals(topics.get(0), heartbeatTopicName.toString()); + } + @Test public void testSetBacklogCausedCreatingProducerFailure() throws Exception { final String ns = "prop/ns-test";