diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a2f6fb9e9773b..6c768a078974f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -513,6 +513,9 @@ public CompletableFuture closeAsync() { return closeFuture; } LOG.info("Closing PulsarService"); + if (topicPoliciesService != null) { + topicPoliciesService.close(); + } if (brokerService != null) { brokerService.unloadNamespaceBundlesGracefully(); } @@ -633,10 +636,6 @@ public CompletableFuture closeAsync() { transactionBufferClient.close(); } - if (topicPoliciesService != null) { - topicPoliciesService.close(); - topicPoliciesService = null; - } if (client != null) { client.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 98ef6bf36edac..841f9bfb669d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -181,7 +181,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private SplitManager splitManager; - volatile boolean started = false; + enum State { + INIT, + RUNNING, + // It's removing visibility of the current broker from other brokers. In this state, it cannot play as a leader + // or follower. + DISABLED, + } + private final AtomicReference state = new AtomicReference<>(State.INIT); private boolean configuredSystemTopics = false; @@ -214,7 +221,7 @@ public CompletableFuture> getOwnedServiceUnitsAsync() { } public Set getOwnedServiceUnits() { - if (!started) { + if (state.get() == State.INIT) { log.warn("Failed to get owned service units, load manager is not started."); return Collections.emptySet(); } @@ -344,7 +351,7 @@ public static CompletableFuture> getAssignedBrokerLoo @Override public void start() throws PulsarServerException { - if (this.started) { + if (state.get() != State.INIT) { return; } try { @@ -443,7 +450,9 @@ public void start() throws PulsarServerException { this.splitScheduler.start(); this.initWaiter.complete(true); - this.started = true; + if (!state.compareAndSet(State.INIT, State.RUNNING)) { + failForUnexpectedState("start"); + } log.info("Started load manager."); } catch (Throwable e) { failStarting(e); @@ -615,21 +624,17 @@ public CompletableFuture> selectAsync(ServiceUnitId bundle, filter.filterAsync(availableBrokerCandidates, bundle, context); futures.add(future); } - CompletableFuture> result = new CompletableFuture<>(); - FutureUtil.waitForAll(futures).whenComplete((__, ex) -> { - if (ex != null) { - // TODO: We may need to revisit this error case. - log.error("Failed to filter out brokers when select bundle: {}", bundle, ex); - } + return FutureUtil.waitForAll(futures).exceptionally(e -> { + // TODO: We may need to revisit this error case. + log.error("Failed to filter out brokers when select bundle: {}", bundle, e); + return null; + }).thenApply(__ -> { if (availableBrokerCandidates.isEmpty()) { - result.complete(Optional.empty()); - return; + return Optional.empty(); } Set candidateBrokers = availableBrokerCandidates.keySet(); - - result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle, context)); + return getBrokerSelectionStrategy().select(candidateBrokers, bundle, context); }); - return result; }); } @@ -667,6 +672,9 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, boolean force, long timeout, TimeUnit timeoutUnit) { + if (state.get() == State.INIT) { + return CompletableFuture.completedFuture(null); + } if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { log.info("Skip unloading namespace bundle: {}.", bundle); return CompletableFuture.completedFuture(null); @@ -755,24 +763,11 @@ private CompletableFuture splitAsync(SplitDecision decision, @Override public void close() throws PulsarServerException { - if (!this.started) { + if (state.get() == State.INIT) { return; } try { - if (brokerLoadDataReportTask != null) { - brokerLoadDataReportTask.cancel(true); - } - - if (topBundlesLoadDataReportTask != null) { - topBundlesLoadDataReportTask.cancel(true); - } - - if (monitorTask != null) { - monitorTask.cancel(true); - } - - this.brokerLoadDataStore.shutdown(); - this.topBundlesLoadDataStore.shutdown(); + stopLoadDataReportTasks(); this.unloadScheduler.close(); this.splitScheduler.close(); this.serviceUnitStateTableViewSyncer.close(); @@ -791,7 +786,7 @@ public void close() throws PulsarServerException { } catch (Exception e) { throw new PulsarServerException(e); } finally { - this.started = false; + state.set(State.INIT); } } @@ -799,6 +794,28 @@ public void close() throws PulsarServerException { } } + private void stopLoadDataReportTasks() { + if (brokerLoadDataReportTask != null) { + brokerLoadDataReportTask.cancel(true); + } + if (topBundlesLoadDataReportTask != null) { + topBundlesLoadDataReportTask.cancel(true); + } + if (monitorTask != null) { + monitorTask.cancel(true); + } + try { + brokerLoadDataStore.shutdown(); + } catch (IOException e) { + log.warn("Failed to shutdown brokerLoadDataStore", e); + } + try { + topBundlesLoadDataStore.shutdown(); + } catch (IOException e) { + log.warn("Failed to shutdown topBundlesLoadDataStore", e); + } + } + public static boolean isInternalTopic(String topic) { return INTERNAL_TOPICS.contains(topic) || topic.startsWith(TOPIC) @@ -814,13 +831,16 @@ synchronized void playLeader() { boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { - if (!initWaiter.get()) { + if (!initWaiter.get() || disabled()) { return; } if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; } + if (disabled()) { + return; + } // Confirm the system topics have been created or create them if they do not exist. // If the leader has changed, the new leader need to reset // the local brokerService.topics (by this topic creations). @@ -835,6 +855,11 @@ synchronized void playLeader() { } break; } catch (Throwable e) { + if (disabled()) { + log.warn("The broker:{} failed to set the role but exit because it's disabled", + pulsar.getBrokerId(), e); + return; + } log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { @@ -846,6 +871,9 @@ synchronized void playLeader() { } } } + if (disabled()) { + return; + } if (becameFollower) { log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId()); @@ -869,13 +897,16 @@ synchronized void playFollower() { boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { - if (!initWaiter.get()) { + if (!initWaiter.get() || disabled()) { return; } if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; } + if (disabled()) { + return; + } unloadScheduler.close(); serviceUnitStateChannel.cancelOwnershipMonitor(); closeInternalTopics(); @@ -885,6 +916,11 @@ synchronized void playFollower() { serviceUnitStateTableViewSyncer.close(); break; } catch (Throwable e) { + if (disabled()) { + log.warn("The broker:{} failed to set the role but exit because it's disabled", + pulsar.getBrokerId(), e); + return; + } log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { @@ -896,6 +932,9 @@ synchronized void playFollower() { } } } + if (disabled()) { + return; + } if (becameLeader) { log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId()); @@ -982,9 +1021,20 @@ protected void monitor() { } public void disableBroker() throws Exception { + // TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower() + // or playLeader() quickly. + if (!state.compareAndSet(State.RUNNING, State.DISABLED)) { + failForUnexpectedState("disableBroker"); + } + stopLoadDataReportTasks(); serviceUnitStateChannel.cleanOwnerships(); - leaderElectionService.close(); brokerRegistry.unregister(); + leaderElectionService.close(); + final var availableBrokers = brokerRegistry.getAvailableBrokersAsync() + .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + if (availableBrokers.isEmpty()) { + close(); + } // Close the internal topics (if owned any) after giving up the possible leader role, // so that the subsequent lookups could hit the next leader. closeInternalTopics(); @@ -1018,4 +1068,16 @@ protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { return new ServiceUnitStateChannelImpl(pulsar); } + + private void failForUnexpectedState(String msg) { + throw new IllegalStateException("Failed to " + msg + ", state: " + state.get()); + } + + boolean running() { + return state.get() == State.RUNNING; + } + + private boolean disabled() { + return state.get() == State.DISABLED; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 6a48607977ba9..35f6cfcbcf549 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -51,7 +51,7 @@ public void start() throws PulsarServerException { } public boolean started() { - return loadManager.started && loadManager.getServiceUnitStateChannel().started(); + return loadManager.running() && loadManager.getServiceUnitStateChannel().started(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ddbc9eacac921..ce975495feb2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -255,6 +255,7 @@ public void cancelOwnershipMonitor() { @Override public void cleanOwnerships() { + disable(); doCleanup(brokerId, true); } @@ -412,9 +413,7 @@ public CompletableFuture isChannelOwnerAsync() { if (owner.isPresent()) { return isTargetBroker(owner.get()); } else { - String msg = "There is no channel owner now."; - log.error(msg); - throw new IllegalStateException(msg); + throw new IllegalStateException("There is no channel owner now."); } }); } @@ -679,11 +678,15 @@ private void handleEvent(String serviceUnit, ServiceUnitStateData data) { brokerId, serviceUnit, data, totalHandledRequests); } - if (channelState == Disabled) { + ServiceUnitState state = state(data); + if (channelState == Disabled && (data == null || !data.force())) { + final var request = getOwnerRequests.remove(serviceUnit); + if (request != null) { + request.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( + "cancel the lookup request for " + serviceUnit + " when receiving " + state)); + } return; } - - ServiceUnitState state = state(data); try { switch (state) { case Owned -> handleOwnEvent(serviceUnit, data); @@ -851,7 +854,7 @@ private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { } } - private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { + private CompletableFuture handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { var getOwnerRequest = getOwnerRequests.remove(serviceUnit); if (getOwnerRequest != null) { getOwnerRequest.complete(null); @@ -865,8 +868,10 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { : CompletableFuture.completedFuture(0)).thenApply(__ -> null); stateChangeListeners.notifyOnCompletion(future, serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + return future; } else { stateChangeListeners.notify(serviceUnit, data, null); + return CompletableFuture.completedFuture(null); } } @@ -1273,7 +1278,11 @@ private void handleBrokerDeletionEvent(String broker) { return; } } catch (Exception e) { - log.error("Failed to handle broker deletion event.", e); + if (e instanceof ExecutionException && e.getCause() instanceof IllegalStateException) { + log.warn("Failed to handle broker deletion event due to {}", e.getMessage()); + } else { + log.error("Failed to handle broker deletion event.", e); + } return; } MetadataState state = getMetadataState(); @@ -1293,6 +1302,11 @@ private void handleBrokerDeletionEvent(String broker) { private void scheduleCleanup(String broker, long delayInSecs) { var scheduled = new MutableObject>(); try { + final var channelState = this.channelState; + if (channelState == Disabled || channelState == Closed) { + log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState); + return; + } cleanupJobs.computeIfAbsent(broker, k -> { Executor delayed = CompletableFuture .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); @@ -1393,6 +1407,7 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max if (data.state() == Owned && broker.equals(data.dstBroker())) { cleaned = false; + log.info("[{}] bundle {} is still owned by this, data: {}", broker, serviceUnit, data); break; } } @@ -1400,10 +1415,15 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max break; } else { try { - MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS); + tableview.flush(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS / 2); + Thread.sleep(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS / 2); } catch (InterruptedException e) { log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}", brokerId); + } catch (ExecutionException e) { + log.error("Failed to flush table view", e.getCause()); + } catch (TimeoutException e) { + log.warn("Failed to flush the table view in {} ms", OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS); } } } @@ -1428,6 +1448,11 @@ private synchronized void doCleanup(String broker, boolean gracefully) { log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); + try { + tableview.flush(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS); + } catch (Exception e) { + log.error("Failed to flush", e); + } Map orphanSystemServiceUnits = new HashMap<>(); for (var etr : tableview.entrySet()) { var stateData = etr.getValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java index e85134e611632..4a990ddbc9b21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java @@ -34,7 +34,7 @@ public record ServiceUnitStateData( public ServiceUnitStateData { Objects.requireNonNull(state); - if (StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) { + if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) { throw new IllegalArgumentException("Empty broker"); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java index b1dbb6fac8709..3e43237f4c00e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java @@ -145,4 +145,4 @@ private boolean invalidUnload(ServiceUnitStateData from, ServiceUnitStateData to || !from.dstBroker().equals(to.sourceBroker()) || from.dstBroker().equals(to.dstBroker()); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java index 8dfaddcdabca1..12cf87445a3dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.TopicDomain; @@ -144,8 +145,13 @@ public CompletableFuture put(String key, ServiceUnitStateData value) { .sendAsync() .whenComplete((messageId, e) -> { if (e != null) { - log.error("Failed to publish the message: serviceUnit:{}, data:{}", - key, value, e); + if (e instanceof PulsarClientException.AlreadyClosedException) { + log.info("Skip publishing the message since the producer is closed, serviceUnit: {}, data: " + + "{}", key, value); + } else { + log.error("Failed to publish the message: serviceUnit:{}, data:{}", + key, value, e); + } future.completeExceptionally(e); } else { future.complete(null); @@ -159,7 +165,14 @@ public void flush(long waitDurationInMillis) throws InterruptedException, Timeou if (!isValidState()) { throw new IllegalStateException(INVALID_STATE_ERROR_MSG); } - producer.flushAsync().get(waitDurationInMillis, MILLISECONDS); + final var deadline = System.currentTimeMillis() + waitDurationInMillis; + var waitTimeMs = waitDurationInMillis; + producer.flushAsync().get(waitTimeMs, MILLISECONDS); + waitTimeMs = deadline - System.currentTimeMillis(); + if (waitTimeMs < 0) { + waitTimeMs = 0; + } + tableview.refreshAsync().get(waitTimeMs, MILLISECONDS); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java index 48213c18e6376..9863d05ee751e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java @@ -41,7 +41,12 @@ public CompletableFuture> filterAsync(Map { - Optional brokerLoadDataOpt = context.brokerLoadDataStore().get(broker); + final Optional brokerLoadDataOpt; + try { + brokerLoadDataOpt = context.brokerLoadDataStore().get(broker); + } catch (IllegalStateException ignored) { + return false; + } long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L); // TODO: The broker load data might be delayed, so the max topic check might not accurate. return topics >= loadBalancerBrokerMaxTopics; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index c9d18676cfa99..3ce44a1e65a73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -92,7 +92,11 @@ public synchronized CompletableFuture removeAsync(String key) { public synchronized Optional get(String key) { String msg = validateTableView(); if (StringUtils.isNotBlank(msg)) { - throw new IllegalStateException(msg); + if (msg.equals(SHUTDOWN_ERR_MSG)) { + return Optional.empty(); + } else { + throw new IllegalStateException(msg); + } } return Optional.ofNullable(tableView.get(key)); } @@ -193,7 +197,9 @@ public synchronized void startProducer() throws LoadDataStoreException { @Override public synchronized void close() throws IOException { - validateState(); + if (isShutdown) { + return; + } closeProducer(); closeTableView(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 09f04d878c4e5..bfa99eedcadce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -973,7 +973,12 @@ public void unloadNamespaceBundlesGracefully(int maxConcurrentUnload, boolean cl pulsar.getNamespaceService().unloadNamespaceBundle(su, timeout, MILLISECONDS, closeWithoutWaitingClientDisconnect).get(timeout, MILLISECONDS); } catch (Exception e) { - log.warn("Failed to unload namespace bundle {}", su, e); + if (e instanceof ExecutionException + && e.getCause() instanceof ServiceUnitNotReadyException) { + log.warn("Failed to unload namespace bundle {}: {}", su, e.getMessage()); + } else { + log.warn("Failed to unload namespace bundle {}", su, e); + } } } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 18b4c610a5c9b..6ff6408916b1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -254,7 +254,7 @@ public CompletableFuture> getTopicPoliciesAsync(TopicNam // initialization by calling this method. At the moment, the load manager does not start so the lookup // for "__change_events" will fail. In this case, just return an empty policies to avoid deadlock. final var loadManager = pulsarService.getLoadManager().get(); - if (loadManager == null || !loadManager.started()) { + if (loadManager == null || !loadManager.started() || closed.get()) { return CompletableFuture.completedFuture(Optional.empty()); } final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); @@ -308,6 +308,9 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { @VisibleForTesting @Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { requireNonNull(namespace); + if (closed.get()) { + return CompletableFuture.completedFuture(false); + } return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) .thenCompose(namespacePolicies -> { if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) { @@ -331,6 +334,9 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { }); initFuture.exceptionally(ex -> { try { + if (closed.get()) { + return null; + } log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); cleanCacheAndCloseReader(namespace, false); @@ -681,14 +687,22 @@ public void close() throws Exception { if (closed.compareAndSet(false, true)) { writerCaches.synchronous().invalidateAll(); readerCaches.values().forEach(future -> { - if (future != null && !future.isCompletedExceptionally()) { - future.thenAccept(reader -> { - try { - reader.close(); - } catch (Exception e) { - log.error("Failed to close reader.", e); - } - }); + try { + final var reader = future.getNow(null); + if (reader != null) { + reader.close(); + log.info("Closed the reader for topic policies"); + } else { + // Avoid blocking the thread that the reader is created + future.thenAccept(SystemTopicClient.Reader::closeAsync).whenComplete((__, e) -> { + if (e == null) { + log.info("Closed the reader for topic policies"); + } else { + log.error("Failed to close the reader for topic policies", e); + } + }); + } + } catch (Throwable ignored) { } }); readerCaches.clear(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java index 41413f3e3a913..fa63ce566c603 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -22,13 +22,15 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -36,26 +38,33 @@ import org.testng.annotations.Test; @Slf4j +@Test(groups = "broker") public class ExtensibleLoadManagerCloseTest { private static final String clusterName = "test"; - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); private final List brokers = new ArrayList<>(); - private PulsarAdmin admin; + private LocalBookkeeperEnsemble bk; @BeforeClass(alwaysRun = true) public void setup() throws Exception { + bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); bk.start(); - for (int i = 0; i < 3; i++) { + } + + private void setupBrokers(int numBrokers) throws Exception { + brokers.clear(); + for (int i = 0; i < numBrokers; i++) { final var broker = new PulsarService(brokerConfig()); broker.start(); brokers.add(broker); } - admin = brokers.get(0).getAdminClient(); - admin.clusters().createCluster(clusterName, ClusterData.builder().build()); - admin.tenants().createTenant("public", TenantInfo.builder() - .allowedClusters(Collections.singleton(clusterName)).build()); - admin.namespaces().createNamespace("public/default"); + final var admin = brokers.get(0).getAdminClient(); + if (!admin.clusters().getClusters().contains(clusterName)) { + admin.clusters().createCluster(clusterName, ClusterData.builder().build()); + admin.tenants().createTenant("public", TenantInfo.builder() + .allowedClusters(Collections.singleton(clusterName)).build()); + admin.namespaces().createNamespace("public/default"); + } } @@ -85,7 +94,9 @@ private ServiceConfiguration brokerConfig() { @Test public void testCloseAfterLoadingBundles() throws Exception { + setupBrokers(3); final var topic = "test"; + final var admin = brokers.get(0).getAdminClient(); admin.topics().createPartitionedTopic(topic, 20); admin.lookups().lookupPartitionedTopic(topic); final var client = PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build(); @@ -104,4 +115,25 @@ public void testCloseAfterLoadingBundles() throws Exception { Assert.assertTrue(closeTimeMs < 5000L); } } + + @Test + public void testLookup() throws Exception { + setupBrokers(1); + final var topic = "test-lookup"; + final var numPartitions = 16; + final var admin = brokers.get(0).getAdminClient(); + admin.topics().createPartitionedTopic(topic, numPartitions); + + final var futures = new ArrayList>(); + for (int i = 0; i < numPartitions; i++) { + futures.add(admin.lookups().lookupTopicAsync(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + } + FutureUtil.waitForAll(futures).get(); + + final var start = System.currentTimeMillis(); + brokers.get(0).close(); + final var closeTimeMs = System.currentTimeMillis() - start; + log.info("Broker close time: {}", closeTimeMs); + Assert.assertTrue(closeTimeMs < 5000L); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java index 3267e67ad2c3e..820307637be67 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java @@ -198,7 +198,7 @@ public void testShutdown() throws Exception { Assert.assertTrue(loadDataStore.pushAsync("2", 2).isCompletedExceptionally()); Assert.assertTrue(loadDataStore.removeAsync("2").isCompletedExceptionally()); - assertThrows(IllegalStateException.class, () -> loadDataStore.get("2")); + assertTrue(loadDataStore.get("2").isEmpty()); assertThrows(IllegalStateException.class, loadDataStore::size); assertThrows(IllegalStateException.class, loadDataStore::entrySet); assertThrows(IllegalStateException.class, () -> loadDataStore.forEach((k, v) -> {})); @@ -206,7 +206,6 @@ public void testShutdown() throws Exception { assertThrows(IllegalStateException.class, loadDataStore::start); assertThrows(IllegalStateException.class, loadDataStore::startProducer); assertThrows(IllegalStateException.class, loadDataStore::startTableView); - assertThrows(IllegalStateException.class, loadDataStore::close); assertThrows(IllegalStateException.class, loadDataStore::closeTableView); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index d5d4174ee10a9..4f52060497864 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -364,8 +364,8 @@ private void readAllExistingMessages(Reader reader, CompletableFuture f } }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.error("Reader {} was closed while reading existing messages.", - reader.getTopic(), ex); + log.info("Reader {} was closed while reading existing messages.", + reader.getTopic()); } else { log.warn("Reader {} was interrupted while reading existing messages. ", reader.getTopic(), ex); @@ -393,8 +393,7 @@ private void readTailMessages(Reader reader) { readTailMessages(reader); }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.error("Reader {} was closed while reading tail messages.", - reader.getTopic(), ex); + log.info("Reader {} was closed while reading tail messages.", reader.getTopic()); // Fail all refresh request when no more messages can be read. pendingRefreshRequests.keySet().forEach(future -> { pendingRefreshRequests.remove(future);