Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with extensible load manager #23349

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}
LOG.info("Closing PulsarService");
if (topicPoliciesService != null) {
topicPoliciesService.close();
}
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
Expand Down Expand Up @@ -633,10 +636,6 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (topicPoliciesService != null) {
topicPoliciesService.close();
topicPoliciesService = null;
}

if (client != null) {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
private SplitManager splitManager;

volatile boolean started = false;
boolean disabling = false;

private boolean configuredSystemTopics = false;

Expand Down Expand Up @@ -808,6 +809,9 @@ public static boolean isInternalTopic(String topic) {

@VisibleForTesting
synchronized void playLeader() {
if (disabling) {
return;
}
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
Expand Down Expand Up @@ -835,6 +839,10 @@ synchronized void playLeader() {
}
break;
} catch (Throwable e) {
if (disabling) {
log.warn("The broker:{} failed to playLeader, exit because it's disabled", pulsar.getBrokerId());
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -846,6 +854,9 @@ synchronized void playLeader() {
}
}
}
if (disabling) {
return;
}

if (becameFollower) {
log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId());
Expand All @@ -863,6 +874,9 @@ synchronized void playLeader() {

@VisibleForTesting
synchronized void playFollower() {
if (disabling) {
return;
}
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
Expand All @@ -885,6 +899,10 @@ synchronized void playFollower() {
serviceUnitStateTableViewSyncer.close();
break;
} catch (Throwable e) {
if (disabling) {
log.warn("The broker:{} failed to playFollower, exit because it's disabled", pulsar.getBrokerId());
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -896,6 +914,9 @@ synchronized void playFollower() {
}
}
}
if (disabling) {
return;
}

if (becameLeader) {
log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId());
Expand Down Expand Up @@ -982,9 +1003,19 @@ protected void monitor() {
}

public void disableBroker() throws Exception {
// TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower()
// or playLeader() quickly.
synchronized (this) {
disabling = true;
}
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
brokerRegistry.unregister();
final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
if (availableBrokers.isEmpty()) {
close();
}
// Close the internal topics (if owned any) after giving up the possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private volatile long lastOwnEventHandledAt = 0;
private long lastOwnedServiceUnitCountAt = 0;
private int totalOwnedServiceUnitCnt = 0;
private volatile boolean disablePubOwnedEvent = false;

public enum EventType {
Assign,
Expand Down Expand Up @@ -255,6 +256,7 @@ public void cancelOwnershipMonitor() {

@Override
public void cleanOwnerships() {
disablePubOwnedEvent = true;
doCleanup(brokerId, true);
}

Expand Down Expand Up @@ -307,7 +309,7 @@ public synchronized void start() throws PulsarServerException {
pulsar.getConfiguration().getDefaultNumberOfNamespaceBundles());

tableview = createServiceUnitStateTableView();
tableview.start(pulsar, this::handleEvent, this::handleExisting);
tableview.start(pulsar, this::handleEvent, this::handleExisting, this::handleSkippedEvent);

if (debug) {
log.info("Successfully started the channel tableview.");
Expand Down Expand Up @@ -412,9 +414,7 @@ public CompletableFuture<Boolean> isChannelOwnerAsync() {
if (owner.isPresent()) {
return isTargetBroker(owner.get());
} else {
String msg = "There is no channel owner now.";
log.error(msg);
throw new IllegalStateException(msg);
throw new IllegalStateException("There is no channel owner now.");
}
});
}
Expand Down Expand Up @@ -774,7 +774,11 @@ brokerId, getLogEventTag(data), serviceUnit,
}
}

private void handleSkippedEvent(String serviceUnit) {
private void handleSkippedEvent(String serviceUnit, ServiceUnitStateData skippedData) {
if (skippedData.state() == Free) {
handleFreeEvent(serviceUnit, skippedData);
return;
}
var getOwnerRequest = getOwnerRequests.get(serviceUnit);
if (getOwnerRequest != null) {
var data = tableview.get(serviceUnit);
Expand Down Expand Up @@ -815,6 +819,15 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {

private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTargetBroker(data.dstBroker())) {
if (disablePubOwnedEvent) {
log.info("Skip assigning self({}) as the owner after cleanOwnerships", serviceUnit);
final var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
getOwnerRequest.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(
"lookup during ownership cleanup"));
}
return;
}
ServiceUnitStateData next = new ServiceUnitStateData(
Owned, data.dstBroker(), data.sourceBroker(), getNextVersionId(data));
stateChangeListeners.notifyOnCompletion(pubAsync(serviceUnit, next), serviceUnit, data)
Expand Down Expand Up @@ -851,7 +864,7 @@ private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) {
}
}

private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
private CompletableFuture<Void> handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
Expand All @@ -865,8 +878,10 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
: CompletableFuture.completedFuture(0)).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
return future;
} else {
stateChangeListeners.notify(serviceUnit, data, null);
return CompletableFuture.completedFuture(null);
}
}

Expand Down Expand Up @@ -1273,7 +1288,11 @@ private void handleBrokerDeletionEvent(String broker) {
return;
}
} catch (Exception e) {
log.error("Failed to handle broker deletion event.", e);
if (e instanceof ExecutionException && e.getCause() instanceof IllegalStateException) {
log.warn("Failed to handle broker deletion event due to {}", e.getMessage());
} else {
log.error("Failed to handle broker deletion event.", e);
}
return;
}
MetadataState state = getMetadataState();
Expand Down Expand Up @@ -1381,8 +1400,10 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa

private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
long started = System.currentTimeMillis();
final var futures = new HashMap<String, CompletableFuture<Void>>();
while (System.currentTimeMillis() - started < maxWaitTimeInMillis) {
boolean cleaned = true;
futures.clear();
for (var etr : tableview.entrySet()) {
var serviceUnit = etr.getKey();
var data = etr.getValue();
Expand All @@ -1391,7 +1412,9 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
continue;
}

if (data.state() == Owned && broker.equals(data.dstBroker())) {
if (data.state() == Free) {
futures.put(serviceUnit, handleFreeEvent(serviceUnit, data));
} else if (data.state() == Owned && broker.equals(data.dstBroker())) {
cleaned = false;
break;
}
Expand All @@ -1400,18 +1423,34 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
break;
} else {
try {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
tableview.flush(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
} catch (InterruptedException e) {
log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}",
brokerId);
} catch (ExecutionException e) {
log.error("Failed to flush table view", e.getCause());
} catch (TimeoutException e) {
log.warn("Failed to flush the table view in {} ms", OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS);
}
}
}
var waitTimeMs = started + maxWaitTimeInMillis - System.currentTimeMillis();
if (waitTimeMs < 0) {
waitTimeMs = 0;
}
try {
FutureUtil.waitForAll(futures.values()).get(waitTimeMs, MILLISECONDS);
} catch (ExecutionException e) {
log.error("Failed to tombstone {}", futures.keySet(), e.getCause());
} catch (TimeoutException __) {
log.warn("Failed to tombstone {} in {} ms", futures.keySet(), waitTimeMs);
} catch (InterruptedException ignored) {
}
log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId,
System.currentTimeMillis() - started);
}

private synchronized void doCleanup(String broker, boolean gracefully) {
private void doCleanup(String broker, boolean gracefully) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
.isEmpty()) {
Expand All @@ -1428,6 +1467,11 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
try {
tableview.flush(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
} catch (Exception e) {
log.error("Failed to flush", e);
}
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<>();
for (var etr : tableview.entrySet()) {
var stateData = etr.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public record ServiceUnitStateData(

public ServiceUnitStateData {
Objects.requireNonNull(state);
if (StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) {
if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) {
throw new IllegalArgumentException("Empty broker");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,4 @@ private boolean invalidUnload(ServiceUnitStateData from, ServiceUnitStateData to
|| !from.dstBroker().equals(to.sourceBroker())
|| from.dstBroker().equals(to.dstBroker());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public class ServiceUnitStateMetadataStoreTableViewImpl extends ServiceUnitState

public void start(PulsarService pulsar,
BiConsumer<String, ServiceUnitStateData> tailItemListener,
BiConsumer<String, ServiceUnitStateData> existingItemListener)
BiConsumer<String, ServiceUnitStateData> existingItemListener,
BiConsumer<String, ServiceUnitStateData> skippedItemListener)
throws MetadataStoreException {
init(pulsar);
conflictResolver = new ServiceUnitStateDataConflictResolver();
conflictResolver.setStorageType(MetadataStore);
conflictResolver.setSkippedMsgHandler(skippedItemListener);
tableview = new MetadataStoreTableViewImpl<>(ServiceUnitStateData.class,
pulsar.getBrokerId(),
pulsar.getLocalMetadataStore(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.naming.NamespaceBundle;

/**
Expand All @@ -37,6 +39,8 @@
* ServiceUnitStateTableView receives notifications whenever ownership states are updated in the remote store, and
* upon notification, it applies the updates to its local tableview with the listener logic.
*/
@InterfaceStability.Evolving
@InterfaceAudience.LimitedPrivate
public interface ServiceUnitStateTableView extends Closeable {

/**
Expand All @@ -46,11 +50,13 @@ public interface ServiceUnitStateTableView extends Closeable {
* @param pulsar pulsar service reference
* @param tailItemListener listener to listen tail(newly updated) items
* @param existingItemListener listener to listen existing items
* @param skippedItemListener listener for items that are skipped by the topic compaction strategy
* @throws IOException if it fails to init the tableview.
*/
void start(PulsarService pulsar,
BiConsumer<String, ServiceUnitStateData> tailItemListener,
BiConsumer<String, ServiceUnitStateData> existingItemListener) throws IOException;
BiConsumer<String, ServiceUnitStateData> existingItemListener,
BiConsumer<String, ServiceUnitStateData> skippedItemListener) throws IOException;


/**
Expand Down
Loading
Loading