Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with extensible load manager #23349

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7e544b7
Add tests to reproduce
BewareMyPower Sep 24, 2024
d7b2d80
Fix Free event cannot be sent
BewareMyPower Sep 24, 2024
590e8dd
Close TopicPoliciesService before unload
BewareMyPower Sep 24, 2024
c4de2c6
Avoid blocking the thread that the topic policies reader is created
BewareMyPower Sep 24, 2024
810bd04
Tombstone the bundle gracefully
BewareMyPower Sep 24, 2024
c6540d9
Fix load data store related issues
BewareMyPower Sep 25, 2024
9b5d390
Cleanup BK for each test
BewareMyPower Sep 25, 2024
95bc705
Restore handleSkippedEvent and revert the change to shouldKeepLeft
BewareMyPower Sep 25, 2024
a9e9893
Fail all the lookup requests after clearOwnerships() is called
BewareMyPower Sep 25, 2024
ca51fda
Add enum state to replace started and disabling
BewareMyPower Sep 26, 2024
15fee17
Reuse the Disabled channel state to skip handling all events
BewareMyPower Sep 26, 2024
774e32d
Revert "Restore handleSkippedEvent and revert the change to shouldKee…
BewareMyPower Sep 26, 2024
0e4ccc7
Don't handle Free events specially in ServiceUnitStateDataConflictRes…
BewareMyPower Sep 26, 2024
8589e8b
Allow getOwnedServiceUnits() in DISABLED state
BewareMyPower Sep 26, 2024
bdff856
Stop load data report tasks in disableBroker()
BewareMyPower Sep 26, 2024
79fa08d
Remove unused field
BewareMyPower Sep 26, 2024
83a0b88
Fix LoadDataStoreTest#testShutdown
BewareMyPower Sep 26, 2024
e0fd852
Fix wrong logs
BewareMyPower Sep 26, 2024
7d31290
Revert changes on closeInternalTopics
BewareMyPower Sep 26, 2024
8068f1d
Skip scheduleCleanup if the state is Disabled
BewareMyPower Sep 26, 2024
09f40ff
Revert Free events handling on waitForCleanups
BewareMyPower Sep 26, 2024
d429420
Add log for possible reason that blocks the waitForCleanups
BewareMyPower Sep 26, 2024
5ad8309
Fix regression that does not swallow the exception from filters
BewareMyPower Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}
LOG.info("Closing PulsarService");
if (topicPoliciesService != null) {
topicPoliciesService.close();
}
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
Expand Down Expand Up @@ -633,10 +636,6 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (topicPoliciesService != null) {
topicPoliciesService.close();
topicPoliciesService = null;
}

if (client != null) {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
private SplitManager splitManager;

volatile boolean started = false;
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
boolean disabling = false;

private boolean configuredSystemTopics = false;

Expand Down Expand Up @@ -808,6 +809,9 @@ public static boolean isInternalTopic(String topic) {

@VisibleForTesting
synchronized void playLeader() {
if (disabling) {
return;
}
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
Expand Down Expand Up @@ -835,6 +839,10 @@ synchronized void playLeader() {
}
break;
} catch (Throwable e) {
if (disabling) {
log.warn("The broker:{} failed to playLeader, exit because it's disabled", pulsar.getBrokerId());
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -846,6 +854,9 @@ synchronized void playLeader() {
}
}
}
if (disabling) {
return;
}

if (becameFollower) {
log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId());
Expand All @@ -863,6 +874,9 @@ synchronized void playLeader() {

@VisibleForTesting
synchronized void playFollower() {
if (disabling) {
return;
}
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
Expand All @@ -885,6 +899,10 @@ synchronized void playFollower() {
serviceUnitStateTableViewSyncer.close();
break;
} catch (Throwable e) {
if (disabling) {
log.warn("The broker:{} failed to playFollower, exit because it's disabled", pulsar.getBrokerId());
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -896,6 +914,9 @@ synchronized void playFollower() {
}
}
}
if (disabling) {
return;
}

if (becameLeader) {
log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId());
Expand Down Expand Up @@ -982,9 +1003,19 @@ protected void monitor() {
}

public void disableBroker() throws Exception {
// TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower()
// or playLeader() quickly.
synchronized (this) {
disabling = true;
}
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
brokerRegistry.unregister();
final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
if (availableBrokers.isEmpty()) {
close();
}
// Close the internal topics (if owned any) after giving up the possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,7 @@ public CompletableFuture<Boolean> isChannelOwnerAsync() {
if (owner.isPresent()) {
return isTargetBroker(owner.get());
} else {
String msg = "There is no channel owner now.";
log.error(msg);
throw new IllegalStateException(msg);
throw new IllegalStateException("There is no channel owner now.");
}
});
}
Expand Down Expand Up @@ -851,7 +849,7 @@ private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) {
}
}

private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
private CompletableFuture<Void> handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
Expand All @@ -865,8 +863,10 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
: CompletableFuture.completedFuture(0)).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
return future;
} else {
stateChangeListeners.notify(serviceUnit, data, null);
return CompletableFuture.completedFuture(null);
}
}

Expand Down Expand Up @@ -1273,7 +1273,11 @@ private void handleBrokerDeletionEvent(String broker) {
return;
}
} catch (Exception e) {
log.error("Failed to handle broker deletion event.", e);
if (e instanceof ExecutionException && e.getCause() instanceof IllegalStateException) {
log.warn("Failed to handle broker deletion event due to {}", e.getMessage());
} else {
log.error("Failed to handle broker deletion event.", e);
}
return;
}
MetadataState state = getMetadataState();
Expand Down Expand Up @@ -1381,8 +1385,10 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa

private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
long started = System.currentTimeMillis();
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
final var futures = new HashMap<String, CompletableFuture<Void>>();
while (System.currentTimeMillis() - started < maxWaitTimeInMillis) {
boolean cleaned = true;
futures.clear();
for (var etr : tableview.entrySet()) {
var serviceUnit = etr.getKey();
var data = etr.getValue();
Expand All @@ -1391,7 +1397,9 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
continue;
}

if (data.state() == Owned && broker.equals(data.dstBroker())) {
if (data.state() == Free) {
futures.put(serviceUnit, handleFreeEvent(serviceUnit, data));
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
} else if (data.state() == Owned && broker.equals(data.dstBroker())) {
cleaned = false;
break;
}
Expand All @@ -1400,13 +1408,29 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
break;
} else {
try {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
tableview.flush(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
} catch (InterruptedException e) {
log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}",
brokerId);
} catch (ExecutionException e) {
log.error("Failed to flush table view", e.getCause());
} catch (TimeoutException e) {
log.warn("Failed to flush the table view in {} ms", OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
}
}
}
var waitTimeMs = started + maxWaitTimeInMillis - System.currentTimeMillis();
if (waitTimeMs < 0) {
waitTimeMs = 0;
}
try {
FutureUtil.waitForAll(futures.values()).get(waitTimeMs, MILLISECONDS);
} catch (ExecutionException e) {
log.error("Failed to tombstone {}", futures.keySet(), e.getCause());
} catch (TimeoutException __) {
log.warn("Failed to tombstone {} in {} ms", futures.keySet(), waitTimeMs);
} catch (InterruptedException ignored) {
}
log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId,
System.currentTimeMillis() - started);
}
Expand All @@ -1428,6 +1452,11 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
try {
tableview.flush(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
} catch (Exception e) {
log.error("Failed to flush", e);
}
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public record ServiceUnitStateData(

public ServiceUnitStateData {
Objects.requireNonNull(state);
if (StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) {
heesung-sn marked this conversation as resolved.
Show resolved Hide resolved
if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) {
throw new IllegalArgumentException("Empty broker");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void checkBrokers(boolean check) {

@Override
public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) {
if (to == null) {
if (to == null || to.state() == ServiceUnitState.Free) {
return false;
}

Expand Down Expand Up @@ -145,4 +145,4 @@ private boolean invalidUnload(ServiceUnitStateData from, ServiceUnitStateData to
|| !from.dstBroker().equals(to.sourceBroker())
|| from.dstBroker().equals(to.dstBroker());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -144,8 +145,13 @@ public CompletableFuture<Void> put(String key, ServiceUnitStateData value) {
.sendAsync()
.whenComplete((messageId, e) -> {
if (e != null) {
log.error("Failed to publish the message: serviceUnit:{}, data:{}",
key, value, e);
if (e instanceof PulsarClientException.AlreadyClosedException) {
log.info("Skip publishing the message since the producer is closed, serviceUnit: {}, data: "
+ "{}", key, value);
} else {
log.error("Failed to publish the message: serviceUnit:{}, data:{}",
key, value, e);
}
future.completeExceptionally(e);
} else {
future.complete(null);
Expand All @@ -159,7 +165,14 @@ public void flush(long waitDurationInMillis) throws InterruptedException, Timeou
if (!isValidState()) {
throw new IllegalStateException(INVALID_STATE_ERROR_MSG);
}
producer.flushAsync().get(waitDurationInMillis, MILLISECONDS);
final var deadline = System.currentTimeMillis() + waitDurationInMillis;
var waitTimeMs = waitDurationInMillis;
producer.flushAsync().get(waitTimeMs, MILLISECONDS);
waitTimeMs = deadline - System.currentTimeMillis();
if (waitTimeMs < 0) {
waitTimeMs = 0;
}
tableview.refreshAsync().get(waitTimeMs, MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
LoadManagerContext context) {
int loadBalancerBrokerMaxTopics = context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
Optional<BrokerLoadData> brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
final Optional<BrokerLoadData> brokerLoadDataOpt;
try {
brokerLoadDataOpt = context.brokerLoadDataStore().get(broker);
} catch (IllegalStateException ignored) {
return false;
}
long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L);
// TODO: The broker load data might be delayed, so the max topic check might not accurate.
return topics >= loadBalancerBrokerMaxTopics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicNam
// initialization by calling this method. At the moment, the load manager does not start so the lookup
// for "__change_events" will fail. In this case, just return an empty policies to avoid deadlock.
final var loadManager = pulsarService.getLoadManager().get();
if (loadManager == null || !loadManager.started()) {
if (loadManager == null || !loadManager.started() || closed.get()) {
return CompletableFuture.completedFuture(Optional.empty());
}
final CompletableFuture<Boolean> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
Expand Down Expand Up @@ -308,6 +308,9 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
@VisibleForTesting
@Nonnull CompletableFuture<Boolean> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
requireNonNull(namespace);
if (closed.get()) {
return CompletableFuture.completedFuture(false);
}
return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
.thenCompose(namespacePolicies -> {
if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
Expand All @@ -331,6 +334,9 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
});
initFuture.exceptionally(ex -> {
try {
if (closed.get()) {
return null;
}
log.error("[{}] Failed to create reader on __change_events topic",
namespace, ex);
cleanCacheAndCloseReader(namespace, false);
Expand Down Expand Up @@ -681,14 +687,22 @@ public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
writerCaches.synchronous().invalidateAll();
readerCaches.values().forEach(future -> {
if (future != null && !future.isCompletedExceptionally()) {
future.thenAccept(reader -> {
try {
reader.close();
} catch (Exception e) {
log.error("Failed to close reader.", e);
}
});
try {
final var reader = future.getNow(null);
if (reader != null) {
reader.close();
log.info("Closed the reader for topic policies");
} else {
// Avoid blocking the thread that the reader is created
future.thenAccept(SystemTopicClient.Reader::closeAsync).whenComplete((__, e) -> {
if (e == null) {
log.info("Closed the reader for topic policies");
} else {
log.error("Failed to close the reader for topic policies", e);
}
});
}
} catch (Throwable ignored) {
}
});
readerCaches.clear();
Expand Down
Loading
Loading