diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6ee35ad295fb52..12ce3b32d86851 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.admin.impl.BrokersBase.getHeartbeatTopicName; import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; import com.google.common.annotations.VisibleForTesting; @@ -72,6 +73,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.offload.Offloaders; @@ -414,6 +416,41 @@ private void closeLeaderElectionService() throws Exception { } } + private boolean isManagedLedgerNotFoundException(Throwable e) { + Throwable realCause = e.getCause(); + return realCause instanceof ManagedLedgerException.MetadataNotFoundException + || realCause instanceof MetadataStoreException.NotFoundException; + } + + private void deleteHeartbeatResource() { + if (this.brokerService != null) { + LOG.info("forcefully delete heartbeat topic when close broker"); + + String heartbeatTopicNameV1 = getHeartbeatTopicName(getAdvertisedAddress(), getConfiguration(), false); + String heartbeatTopicNameV2 = getHeartbeatTopicName(getAdvertisedAddress(), getConfiguration(), true); + + try { + this.brokerService.deleteTopic(heartbeatTopicNameV1, true).get(); + } catch (Exception e) { + if (!isManagedLedgerNotFoundException(e)) { + LOG.error("Closed with errors in delete heartbeat topic [{}]", + heartbeatTopicNameV1, e); + } + } + + try { + this.brokerService.deleteTopic(heartbeatTopicNameV2, true).get(); + } catch (Exception e) { + if (!isManagedLedgerNotFoundException(e)) { + LOG.error("Closed with errors in delete heartbeat topic [{}]", + heartbeatTopicNameV2, e); + } + } + + LOG.info("finish forcefully delete heartbeat topic when close broker"); + } + } + @Override public void close() throws PulsarServerException { try { @@ -453,6 +490,9 @@ public CompletableFuture closeAsync() { } state = State.Closing; + // forcefully delete heartbeat topic when close broker + deleteHeartbeatResource(); + // close the service in reverse order v.s. in which they are started if (this.resourceUsageTransportManager != null) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 83067e9f296ef6..80f793654e4355 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -404,13 +404,17 @@ private void checkDeadlockedThreads() { } } + public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration configuration, boolean isV2) { + NamespaceName namespaceName = isV2 + ? NamespaceService.getHeartbeatNamespaceV2(brokerId, configuration) + : NamespaceService.getHeartbeatNamespace(brokerId, configuration); + return String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX); + } private CompletableFuture internalRunHealthCheck(TopicVersion topicVersion) { String brokerId = pulsar().getBrokerId(); - NamespaceName namespaceName = (topicVersion == TopicVersion.V2) - ? NamespaceService.getHeartbeatNamespaceV2(brokerId, pulsar().getConfiguration()) - : NamespaceService.getHeartbeatNamespace(brokerId, pulsar().getConfiguration()); - final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX); + final String topicName = + getHeartbeatTopicName(brokerId, pulsar().getConfiguration(), (topicVersion == TopicVersion.V2)); LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName); final String messageStr = UUID.randomUUID().toString(); final String subscriptionName = "healthCheck-" + messageStr;