Skip to content

Commit

Permalink
fix can not cleanup heartbeat data if scaling down broker
Browse files Browse the repository at this point in the history
  • Loading branch information
fanjianye committed May 21, 2024
1 parent 6372b9c commit 7a3e508
Showing 1 changed file with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -72,6 +73,7 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.Offloaders;
Expand Down Expand Up @@ -438,6 +440,12 @@ public void close() throws PulsarServerException {
}
}

protected boolean isManagedLedgerNotFoundException(Throwable e) {
Throwable realCause = e.getCause();
return realCause instanceof ManagedLedgerException.MetadataNotFoundException
|| realCause instanceof MetadataStoreException.NotFoundException;
}

/**
* Close the current pulsar service. All resources are released.
*/
Expand All @@ -453,6 +461,37 @@ public CompletableFuture<Void> closeAsync() {
}
state = State.Closing;

// forcefully delete heartbeat topic when close broker
if (this.brokerService != null) {
LOG.info("forcefully delete heartbeat topic when close broker");
NamespaceName namespaceNameV1 =
NamespaceService.getHeartbeatNamespace(getAdvertisedAddress(), getConfiguration());
NamespaceName namespaceNameV2 =
NamespaceService.getHeartbeatNamespaceV2(getAdvertisedAddress(), getConfiguration());
String heartbeatTopicNameV1 = String.format("persistent://%s/%s", namespaceNameV1, HEALTH_CHECK_TOPIC_SUFFIX);
String heartbeatTopicNameV2 = String.format("persistent://%s/%s", namespaceNameV2, HEALTH_CHECK_TOPIC_SUFFIX);

try {
this.brokerService.deleteTopic(heartbeatTopicNameV1, true).get();
} catch (Exception e) {
if (!isManagedLedgerNotFoundException(e)) {
LOG.error("Closed with errors in delete heartbeat topic [{}]",
heartbeatTopicNameV1, e);
}
}

try {
this.brokerService.deleteTopic(heartbeatTopicNameV2, true).get();
} catch (Exception e) {
if (!isManagedLedgerNotFoundException(e)) {
LOG.error("Closed with errors in delete heartbeat topic [{}]",
heartbeatTopicNameV2, e);
}
}

LOG.info("finish forcefully delete heartbeat topic when close broker");
}

// close the service in reverse order v.s. in which they are started
if (this.resourceUsageTransportManager != null) {
try {
Expand Down

0 comments on commit 7a3e508

Please sign in to comment.