Skip to content

Commit

Permalink
fix can not cleanup heartbeat data if scaling down broker
Browse files Browse the repository at this point in the history
  • Loading branch information
fanjianye committed May 22, 2024
1 parent 6372b9c commit 20f3895
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.BrokersBase.getHeartbeatTopicName;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -72,6 +73,7 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.Offloaders;
Expand Down Expand Up @@ -414,6 +416,41 @@ private void closeLeaderElectionService() throws Exception {
}
}

private boolean isManagedLedgerNotFoundException(Throwable e) {
Throwable realCause = e.getCause();
return realCause instanceof ManagedLedgerException.MetadataNotFoundException
|| realCause instanceof MetadataStoreException.NotFoundException;
}

private void deleteHeartbeatResource() {
if (this.brokerService != null) {
LOG.info("forcefully delete heartbeat topic when close broker");

String heartbeatTopicNameV1 = getHeartbeatTopicName(getBrokerId(), getConfiguration(), false);
String heartbeatTopicNameV2 = getHeartbeatTopicName(getBrokerId(), getConfiguration(), true);

try {
this.brokerService.deleteTopic(heartbeatTopicNameV1, true).get();
} catch (Exception e) {
if (!isManagedLedgerNotFoundException(e)) {
LOG.error("Closed with errors in delete heartbeat topic [{}]",
heartbeatTopicNameV1, e);
}
}

try {
this.brokerService.deleteTopic(heartbeatTopicNameV2, true).get();
} catch (Exception e) {
if (!isManagedLedgerNotFoundException(e)) {
LOG.error("Closed with errors in delete heartbeat topic [{}]",
heartbeatTopicNameV2, e);
}
}

LOG.info("finish forcefully delete heartbeat topic when close broker");
}
}

@Override
public void close() throws PulsarServerException {
try {
Expand Down Expand Up @@ -453,6 +490,11 @@ public CompletableFuture<Void> closeAsync() {
}
state = State.Closing;

if (brokerId != null) {
// forcefully delete heartbeat topic when close broker
deleteHeartbeatResource();
}

// close the service in reverse order v.s. in which they are started
if (this.resourceUsageTransportManager != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,17 @@ private void checkDeadlockedThreads() {
}
}

public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration configuration, boolean isV2) {
NamespaceName namespaceName = isV2
? NamespaceService.getHeartbeatNamespaceV2(brokerId, configuration)
: NamespaceService.getHeartbeatNamespace(brokerId, configuration);
return String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
}

private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
String brokerId = pulsar().getBrokerId();
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(brokerId, pulsar().getConfiguration())
: NamespaceService.getHeartbeatNamespace(brokerId, pulsar().getConfiguration());
final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
final String topicName =
getHeartbeatTopicName(brokerId, pulsar().getConfiguration(), (topicVersion == TopicVersion.V2));
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
final String messageStr = UUID.randomUUID().toString();
final String subscriptionName = "healthCheck-" + messageStr;
Expand Down

0 comments on commit 20f3895

Please sign in to comment.