diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6ee35ad295fb52..ef957febc65497 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.admin.impl.BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX; import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; import com.google.common.annotations.VisibleForTesting; @@ -72,6 +73,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.offload.Offloaders; @@ -438,6 +440,12 @@ public void close() throws PulsarServerException { } } + protected boolean isManagedLedgerNotFoundException(Throwable e) { + Throwable realCause = e.getCause(); + return realCause instanceof ManagedLedgerException.MetadataNotFoundException + || realCause instanceof MetadataStoreException.NotFoundException; + } + /** * Close the current pulsar service. All resources are released. */ @@ -453,6 +461,37 @@ public CompletableFuture closeAsync() { } state = State.Closing; + // forcefully delete heartbeat topic when close broker + if (this.brokerService != null) { + LOG.info("forcefully delete heartbeat topic when close broker"); + NamespaceName namespaceNameV1 = + NamespaceService.getHeartbeatNamespace(getAdvertisedAddress(), getConfiguration()); + NamespaceName namespaceNameV2 = + NamespaceService.getHeartbeatNamespaceV2(getAdvertisedAddress(), getConfiguration()); + String heartbeatTopicNameV1 = String.format("persistent://%s/%s", namespaceNameV1, HEALTH_CHECK_TOPIC_SUFFIX); + String heartbeatTopicNameV2 = String.format("persistent://%s/%s", namespaceNameV2, HEALTH_CHECK_TOPIC_SUFFIX); + + try { + this.brokerService.deleteTopic(heartbeatTopicNameV1, true).get(); + } catch (Exception e) { + if (!isManagedLedgerNotFoundException(e)) { + LOG.error("Closed with errors in delete heartbeat topic [{}]", + heartbeatTopicNameV1, e); + } + } + + try { + this.brokerService.deleteTopic(heartbeatTopicNameV2, true).get(); + } catch (Exception e) { + if (!isManagedLedgerNotFoundException(e)) { + LOG.error("Closed with errors in delete heartbeat topic [{}]", + heartbeatTopicNameV2, e); + } + } + + LOG.info("finish forcefully delete heartbeat topic when close broker"); + } + // close the service in reverse order v.s. in which they are started if (this.resourceUsageTransportManager != null) { try {