From 7e544b79112650dc56c370b51522beec1a7e94e5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 24 Sep 2024 17:57:12 +0800 Subject: [PATCH 1/9] Add tests to reproduce --- .../ExtensibleLoadManagerCloseTest.java | 46 +++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java index 41413f3e3a913..f345fae5d0a07 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -22,13 +22,15 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -41,21 +43,26 @@ public class ExtensibleLoadManagerCloseTest { private static final String clusterName = "test"; private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); private final List brokers = new ArrayList<>(); - private PulsarAdmin admin; @BeforeClass(alwaysRun = true) public void setup() throws Exception { bk.start(); - for (int i = 0; i < 3; i++) { + } + + private void setupBrokers(int numBrokers) throws Exception { + brokers.clear(); + for (int i = 0; i < numBrokers; i++) { final var broker = new PulsarService(brokerConfig()); broker.start(); brokers.add(broker); } - admin = brokers.get(0).getAdminClient(); - admin.clusters().createCluster(clusterName, ClusterData.builder().build()); - admin.tenants().createTenant("public", TenantInfo.builder() - .allowedClusters(Collections.singleton(clusterName)).build()); - admin.namespaces().createNamespace("public/default"); + final var admin = brokers.get(0).getAdminClient(); + if (!admin.clusters().getClusters().contains(clusterName)) { + admin.clusters().createCluster(clusterName, ClusterData.builder().build()); + admin.tenants().createTenant("public", TenantInfo.builder() + .allowedClusters(Collections.singleton(clusterName)).build()); + admin.namespaces().createNamespace("public/default"); + } } @@ -85,7 +92,9 @@ private ServiceConfiguration brokerConfig() { @Test public void testCloseAfterLoadingBundles() throws Exception { + setupBrokers(3); final var topic = "test"; + final var admin = brokers.get(0).getAdminClient(); admin.topics().createPartitionedTopic(topic, 20); admin.lookups().lookupPartitionedTopic(topic); final var client = PulsarClient.builder().serviceUrl(brokers.get(0).getBrokerServiceUrl()).build(); @@ -104,4 +113,25 @@ public void testCloseAfterLoadingBundles() throws Exception { Assert.assertTrue(closeTimeMs < 5000L); } } + + @Test + public void testLookup() throws Exception { + setupBrokers(1); + final var topic = "test-lookup"; + final var numPartitions = 16; + final var admin = brokers.get(0).getAdminClient(); + admin.topics().createPartitionedTopic(topic, numPartitions); + + final var futures = new ArrayList>(); + for (int i = 0; i < numPartitions; i++) { + futures.add(admin.lookups().lookupTopicAsync(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i)); + } + FutureUtil.waitForAll(futures).get(); + + final var start = System.currentTimeMillis(); + brokers.get(0).close(); + final var closeTimeMs = System.currentTimeMillis() - start; + log.info("Broker close time: {}", closeTimeMs); + Assert.assertTrue(closeTimeMs < 5000L); + } } From d7b2d80073bb92c2821a5722700badb5f9400c96 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 24 Sep 2024 22:26:25 +0800 Subject: [PATCH 2/9] Fix Free event cannot be sent --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 6 +++++- .../extensions/channel/ServiceUnitStateData.java | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ddbc9eacac921..a80b38c7df8af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1273,7 +1273,11 @@ private void handleBrokerDeletionEvent(String broker) { return; } } catch (Exception e) { - log.error("Failed to handle broker deletion event.", e); + if (e instanceof ExecutionException && e.getCause() instanceof IllegalStateException) { + log.warn("Failed to handle broker deletion event due to {}", e.getMessage()); + } else { + log.error("Failed to handle broker deletion event.", e); + } return; } MetadataState state = getMetadataState(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java index e85134e611632..4a990ddbc9b21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateData.java @@ -34,7 +34,7 @@ public record ServiceUnitStateData( public ServiceUnitStateData { Objects.requireNonNull(state); - if (StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) { + if (state != ServiceUnitState.Free && StringUtils.isBlank(dstBroker) && StringUtils.isBlank(sourceBroker)) { throw new IllegalArgumentException("Empty broker"); } } From 590e8dd02edc3bf9ab82eca587279db71681adc0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 24 Sep 2024 22:38:49 +0800 Subject: [PATCH 3/9] Close TopicPoliciesService before unload --- .../main/java/org/apache/pulsar/broker/PulsarService.java | 7 +++---- .../service/SystemTopicBasedTopicPoliciesService.java | 8 +++++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a2f6fb9e9773b..6c768a078974f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -513,6 +513,9 @@ public CompletableFuture closeAsync() { return closeFuture; } LOG.info("Closing PulsarService"); + if (topicPoliciesService != null) { + topicPoliciesService.close(); + } if (brokerService != null) { brokerService.unloadNamespaceBundlesGracefully(); } @@ -633,10 +636,6 @@ public CompletableFuture closeAsync() { transactionBufferClient.close(); } - if (topicPoliciesService != null) { - topicPoliciesService.close(); - topicPoliciesService = null; - } if (client != null) { client.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 18b4c610a5c9b..59bb5f2337fff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -254,7 +254,7 @@ public CompletableFuture> getTopicPoliciesAsync(TopicNam // initialization by calling this method. At the moment, the load manager does not start so the lookup // for "__change_events" will fail. In this case, just return an empty policies to avoid deadlock. final var loadManager = pulsarService.getLoadManager().get(); - if (loadManager == null || !loadManager.started()) { + if (loadManager == null || !loadManager.started() || closed.get()) { return CompletableFuture.completedFuture(Optional.empty()); } final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); @@ -308,6 +308,9 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { @VisibleForTesting @Nonnull CompletableFuture prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) { requireNonNull(namespace); + if (closed.get()) { + return CompletableFuture.completedFuture(false); + } return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) .thenCompose(namespacePolicies -> { if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) { @@ -331,6 +334,9 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { }); initFuture.exceptionally(ex -> { try { + if (closed.get()) { + return null; + } log.error("[{}] Failed to create reader on __change_events topic", namespace, ex); cleanCacheAndCloseReader(namespace, false); From c4de2c69ae8987ad717117d03698d78ce3e11cb9 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 24 Sep 2024 23:44:16 +0800 Subject: [PATCH 4/9] Avoid blocking the thread that the topic policies reader is created --- .../SystemTopicBasedTopicPoliciesService.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 59bb5f2337fff..6ff6408916b1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -687,14 +687,22 @@ public void close() throws Exception { if (closed.compareAndSet(false, true)) { writerCaches.synchronous().invalidateAll(); readerCaches.values().forEach(future -> { - if (future != null && !future.isCompletedExceptionally()) { - future.thenAccept(reader -> { - try { - reader.close(); - } catch (Exception e) { - log.error("Failed to close reader.", e); - } - }); + try { + final var reader = future.getNow(null); + if (reader != null) { + reader.close(); + log.info("Closed the reader for topic policies"); + } else { + // Avoid blocking the thread that the reader is created + future.thenAccept(SystemTopicClient.Reader::closeAsync).whenComplete((__, e) -> { + if (e == null) { + log.info("Closed the reader for topic policies"); + } else { + log.error("Failed to close the reader for topic policies", e); + } + }); + } + } catch (Throwable ignored) { } }); readerCaches.clear(); From 810bd04dc16d8c478e7b99dec6bcc3543ce0036f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Sep 2024 00:57:23 +0800 Subject: [PATCH 5/9] Tombstone the bundle gracefully --- .../channel/ServiceUnitStateChannelImpl.java | 31 ++++++++++++++++--- .../ServiceUnitStateTableViewImpl.java | 19 ++++++++++-- .../pulsar/client/impl/TableViewImpl.java | 7 ++--- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index a80b38c7df8af..5c260ad9c2ccb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -412,9 +412,7 @@ public CompletableFuture isChannelOwnerAsync() { if (owner.isPresent()) { return isTargetBroker(owner.get()); } else { - String msg = "There is no channel owner now."; - log.error(msg); - throw new IllegalStateException(msg); + throw new IllegalStateException("There is no channel owner now."); } }); } @@ -851,7 +849,7 @@ private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) { } } - private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { + private CompletableFuture handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { var getOwnerRequest = getOwnerRequests.remove(serviceUnit); if (getOwnerRequest != null) { getOwnerRequest.complete(null); @@ -865,8 +863,10 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { : CompletableFuture.completedFuture(0)).thenApply(__ -> null); stateChangeListeners.notifyOnCompletion(future, serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + return future; } else { stateChangeListeners.notify(serviceUnit, data, null); + return CompletableFuture.completedFuture(null); } } @@ -1385,8 +1385,10 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) { long started = System.currentTimeMillis(); + final var futures = new HashMap>(); while (System.currentTimeMillis() - started < maxWaitTimeInMillis) { boolean cleaned = true; + futures.clear(); for (var etr : tableview.entrySet()) { var serviceUnit = etr.getKey(); var data = etr.getValue(); @@ -1395,7 +1397,9 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max continue; } - if (data.state() == Owned && broker.equals(data.dstBroker())) { + if (data.state() == Free) { + futures.put(serviceUnit, handleFreeEvent(serviceUnit, data)); + } else if (data.state() == Owned && broker.equals(data.dstBroker())) { cleaned = false; break; } @@ -1411,6 +1415,18 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max } } } + var waitTimeMs = started + maxWaitTimeInMillis - System.currentTimeMillis(); + if (waitTimeMs < 0) { + waitTimeMs = 0; + } + try { + FutureUtil.waitForAll(futures.values()).get(waitTimeMs, MILLISECONDS); + } catch (ExecutionException e) { + log.error("Failed to tombstone {}", futures.keySet(), e.getCause()); + } catch (TimeoutException __) { + log.warn("Failed to tombstone {} in {} ms", futures.keySet(), waitTimeMs); + } catch (InterruptedException ignored) { + } log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId, System.currentTimeMillis() - started); } @@ -1432,6 +1448,11 @@ private synchronized void doCleanup(String broker, boolean gracefully) { log.info("Started ownership cleanup for the inactive broker:{}", broker); int orphanServiceUnitCleanupCnt = 0; long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); + try { + tableview.flush(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS); + } catch (Exception e) { + log.error("Failed to flush", e); + } Map orphanSystemServiceUnits = new HashMap<>(); for (var etr : tableview.entrySet()) { var stateData = etr.getValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java index 8dfaddcdabca1..12cf87445a3dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.TopicDomain; @@ -144,8 +145,13 @@ public CompletableFuture put(String key, ServiceUnitStateData value) { .sendAsync() .whenComplete((messageId, e) -> { if (e != null) { - log.error("Failed to publish the message: serviceUnit:{}, data:{}", - key, value, e); + if (e instanceof PulsarClientException.AlreadyClosedException) { + log.info("Skip publishing the message since the producer is closed, serviceUnit: {}, data: " + + "{}", key, value); + } else { + log.error("Failed to publish the message: serviceUnit:{}, data:{}", + key, value, e); + } future.completeExceptionally(e); } else { future.complete(null); @@ -159,7 +165,14 @@ public void flush(long waitDurationInMillis) throws InterruptedException, Timeou if (!isValidState()) { throw new IllegalStateException(INVALID_STATE_ERROR_MSG); } - producer.flushAsync().get(waitDurationInMillis, MILLISECONDS); + final var deadline = System.currentTimeMillis() + waitDurationInMillis; + var waitTimeMs = waitDurationInMillis; + producer.flushAsync().get(waitTimeMs, MILLISECONDS); + waitTimeMs = deadline - System.currentTimeMillis(); + if (waitTimeMs < 0) { + waitTimeMs = 0; + } + tableview.refreshAsync().get(waitTimeMs, MILLISECONDS); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index d5d4174ee10a9..4f52060497864 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -364,8 +364,8 @@ private void readAllExistingMessages(Reader reader, CompletableFuture f } }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.error("Reader {} was closed while reading existing messages.", - reader.getTopic(), ex); + log.info("Reader {} was closed while reading existing messages.", + reader.getTopic()); } else { log.warn("Reader {} was interrupted while reading existing messages. ", reader.getTopic(), ex); @@ -393,8 +393,7 @@ private void readTailMessages(Reader reader) { readTailMessages(reader); }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.error("Reader {} was closed while reading tail messages.", - reader.getTopic(), ex); + log.info("Reader {} was closed while reading tail messages.", reader.getTopic()); // Fail all refresh request when no more messages can be read. pendingRefreshRequests.keySet().forEach(future -> { pendingRefreshRequests.remove(future); From c6540d9a73b86f6965b6cf43b3a3c0bf3ff79713 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Sep 2024 12:01:24 +0800 Subject: [PATCH 6/9] Fix load data store related issues --- .../extensions/ExtensibleLoadManagerImpl.java | 31 +++++++++++++++++++ .../channel/ServiceUnitStateChannelImpl.java | 6 +++- .../ServiceUnitStateDataConflictResolver.java | 4 +-- .../filter/BrokerMaxTopicCountFilter.java | 7 ++++- .../ExtensibleLoadManagerCloseTest.java | 1 + 5 files changed, 45 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 98ef6bf36edac..7be7ca1f89827 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -182,6 +182,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private SplitManager splitManager; volatile boolean started = false; + boolean disabling = false; private boolean configuredSystemTopics = false; @@ -808,6 +809,9 @@ public static boolean isInternalTopic(String topic) { @VisibleForTesting synchronized void playLeader() { + if (disabling) { + return; + } log.info("This broker:{} is setting the role from {} to {}", pulsar.getBrokerId(), role, Leader); int retry = 0; @@ -835,6 +839,10 @@ synchronized void playLeader() { } break; } catch (Throwable e) { + if (disabling) { + log.warn("The broker:{} failed to playLeader, exit because it's disabled", pulsar.getBrokerId()); + return; + } log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { @@ -846,6 +854,9 @@ synchronized void playLeader() { } } } + if (disabling) { + return; + } if (becameFollower) { log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId()); @@ -863,6 +874,9 @@ synchronized void playLeader() { @VisibleForTesting synchronized void playFollower() { + if (disabling) { + return; + } log.info("This broker:{} is setting the role from {} to {}", pulsar.getBrokerId(), role, Follower); int retry = 0; @@ -885,6 +899,10 @@ synchronized void playFollower() { serviceUnitStateTableViewSyncer.close(); break; } catch (Throwable e) { + if (disabling) { + log.warn("The broker:{} failed to playFollower, exit because it's disabled", pulsar.getBrokerId()); + return; + } log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { @@ -896,6 +914,9 @@ synchronized void playFollower() { } } } + if (disabling) { + return; + } if (becameLeader) { log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId()); @@ -982,9 +1003,19 @@ protected void monitor() { } public void disableBroker() throws Exception { + // TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower() + // or playLeader() quickly. + synchronized (this) { + disabling = true; + } serviceUnitStateChannel.cleanOwnerships(); leaderElectionService.close(); brokerRegistry.unregister(); + final var availableBrokers = brokerRegistry.getAvailableBrokersAsync() + .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + if (availableBrokers.isEmpty()) { + close(); + } // Close the internal topics (if owned any) after giving up the possible leader role, // so that the subsequent lookups could hit the next leader. closeInternalTopics(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 5c260ad9c2ccb..cdfddc2d62b4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1408,10 +1408,14 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max break; } else { try { - MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS); + tableview.flush(OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS); } catch (InterruptedException e) { log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}", brokerId); + } catch (ExecutionException e) { + log.error("Failed to flush table view", e.getCause()); + } catch (TimeoutException e) { + log.warn("Failed to flush the table view in {} ms", OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java index b1dbb6fac8709..d812842c26271 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java @@ -67,7 +67,7 @@ public void checkBrokers(boolean check) { @Override public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) { - if (to == null) { + if (to == null || to.state() == ServiceUnitState.Free) { return false; } @@ -145,4 +145,4 @@ private boolean invalidUnload(ServiceUnitStateData from, ServiceUnitStateData to || !from.dstBroker().equals(to.sourceBroker()) || from.dstBroker().equals(to.dstBroker()); } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java index 48213c18e6376..9863d05ee751e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java @@ -41,7 +41,12 @@ public CompletableFuture> filterAsync(Map { - Optional brokerLoadDataOpt = context.brokerLoadDataStore().get(broker); + final Optional brokerLoadDataOpt; + try { + brokerLoadDataOpt = context.brokerLoadDataStore().get(broker); + } catch (IllegalStateException ignored) { + return false; + } long topics = brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0L); // TODO: The broker load data might be delayed, so the max topic check might not accurate. return topics >= loadBalancerBrokerMaxTopics; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java index f345fae5d0a07..ecaa72d40e9b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -38,6 +38,7 @@ import org.testng.annotations.Test; @Slf4j +@Test(groups = "broker") public class ExtensibleLoadManagerCloseTest { private static final String clusterName = "test"; From 9b5d390f888fff34d9f7fdb108411cfa40cc768a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Sep 2024 13:16:04 +0800 Subject: [PATCH 7/9] Cleanup BK for each test --- .../loadbalance/extensions/ExtensibleLoadManagerCloseTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java index ecaa72d40e9b3..fa63ce566c603 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -42,11 +42,12 @@ public class ExtensibleLoadManagerCloseTest { private static final String clusterName = "test"; - private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); private final List brokers = new ArrayList<>(); + private LocalBookkeeperEnsemble bk; @BeforeClass(alwaysRun = true) public void setup() throws Exception { + bk = new LocalBookkeeperEnsemble(1, 0, () -> 0); bk.start(); } From 95bc70515721f8974772d4bda462fe052460ea16 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Sep 2024 17:10:15 +0800 Subject: [PATCH 8/9] Restore handleSkippedEvent and revert the change to shouldKeepLeft --- .../channel/ServiceUnitStateChannelImpl.java | 8 ++++++-- .../ServiceUnitStateDataConflictResolver.java | 2 +- .../ServiceUnitStateMetadataStoreTableViewImpl.java | 4 +++- .../channel/ServiceUnitStateTableView.java | 8 +++++++- .../channel/ServiceUnitStateTableViewImpl.java | 13 +++++++++++-- .../channel/ServiceUnitStateTableViewSyncer.java | 12 ++++++++---- 6 files changed, 36 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index cdfddc2d62b4a..5725249769740 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -307,7 +307,7 @@ public synchronized void start() throws PulsarServerException { pulsar.getConfiguration().getDefaultNumberOfNamespaceBundles()); tableview = createServiceUnitStateTableView(); - tableview.start(pulsar, this::handleEvent, this::handleExisting); + tableview.start(pulsar, this::handleEvent, this::handleExisting, this::handleSkippedEvent); if (debug) { log.info("Successfully started the channel tableview."); @@ -772,7 +772,11 @@ brokerId, getLogEventTag(data), serviceUnit, } } - private void handleSkippedEvent(String serviceUnit) { + private void handleSkippedEvent(String serviceUnit, ServiceUnitStateData skippedData) { + if (skippedData.state() == Free) { + handleFreeEvent(serviceUnit, skippedData); + return; + } var getOwnerRequest = getOwnerRequests.get(serviceUnit); if (getOwnerRequest != null) { var data = tableview.get(serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java index d812842c26271..3e43237f4c00e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateDataConflictResolver.java @@ -67,7 +67,7 @@ public void checkBrokers(boolean check) { @Override public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) { - if (to == null || to.state() == ServiceUnitState.Free) { + if (to == null) { return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java index f488b31c77415..283381a2e14bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java @@ -56,11 +56,13 @@ public class ServiceUnitStateMetadataStoreTableViewImpl extends ServiceUnitState public void start(PulsarService pulsar, BiConsumer tailItemListener, - BiConsumer existingItemListener) + BiConsumer existingItemListener, + BiConsumer skippedItemListener) throws MetadataStoreException { init(pulsar); conflictResolver = new ServiceUnitStateDataConflictResolver(); conflictResolver.setStorageType(MetadataStore); + conflictResolver.setSkippedMsgHandler(skippedItemListener); tableview = new MetadataStoreTableViewImpl<>(ServiceUnitStateData.class, pulsar.getBrokerId(), pulsar.getLocalMetadataStore(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java index 5ac57fe5c19c6..b55e44a0cc7e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableView.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.common.naming.NamespaceBundle; /** @@ -37,6 +39,8 @@ * ServiceUnitStateTableView receives notifications whenever ownership states are updated in the remote store, and * upon notification, it applies the updates to its local tableview with the listener logic. */ +@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate public interface ServiceUnitStateTableView extends Closeable { /** @@ -46,11 +50,13 @@ public interface ServiceUnitStateTableView extends Closeable { * @param pulsar pulsar service reference * @param tailItemListener listener to listen tail(newly updated) items * @param existingItemListener listener to listen existing items + * @param skippedItemListener listener for items that are skipped by the topic compaction strategy * @throws IOException if it fails to init the tableview. */ void start(PulsarService pulsar, BiConsumer tailItemListener, - BiConsumer existingItemListener) throws IOException; + BiConsumer existingItemListener, + BiConsumer skippedItemListener) throws IOException; /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java index 12cf87445a3dd..91f4d12df80d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.java @@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; +import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; import java.io.IOException; import java.util.Map; import java.util.Set; @@ -38,6 +39,7 @@ import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; @Slf4j public class ServiceUnitStateTableViewImpl extends ServiceUnitStateTableViewBase { @@ -53,7 +55,8 @@ public class ServiceUnitStateTableViewImpl extends ServiceUnitStateTableViewBase public void start(PulsarService pulsar, BiConsumer tailItemListener, - BiConsumer existingItemListener) throws IOException { + BiConsumer existingItemListener, + BiConsumer skippedItemListener) throws IOException { boolean debug = ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log); init(pulsar); @@ -98,7 +101,13 @@ public void start(PulsarService pulsar, tableview.listen(tailItemListener); tableview.forEach(this::updateOwnedServiceUnits); tableview.forEach(existingItemListener); - + final var strategy = (ServiceUnitStateDataConflictResolver) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG); + if (strategy == null) { + String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null."; + log.error(err); + throw new IllegalStateException(err); + } + strategy.setSkippedMsgHandler(skippedItemListener); } private boolean isValidState() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java index 10ab39a66d279..42550eaa43963 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java @@ -100,7 +100,8 @@ private void syncExistingItems() metadataStoreTableView.start( pulsar, this::dummy, - this::dummy + this::dummy, + (__, ___) -> {} ); @Cleanup @@ -108,7 +109,8 @@ private void syncExistingItems() systemTopicTableView.start( pulsar, this::dummy, - this::dummy + this::dummy, + (__, ___) -> {} ); @@ -152,7 +154,8 @@ private void syncTailItems() throws InterruptedException, IOException, TimeoutEx this.metadataStoreTableView.start( pulsar, this::syncToSystemTopic, - this::dummy + this::dummy, + (__, ___) -> {} ); log.info("Started MetadataStoreTableView"); @@ -160,7 +163,8 @@ private void syncTailItems() throws InterruptedException, IOException, TimeoutEx this.systemTopicTableView.start( pulsar, this::syncToMetadataStore, - this::dummy + this::dummy, + (__, ___) -> {} ); log.info("Started SystemTopicTableView"); From a9e9893c63f2fa806c7d810ba343bc8b750be224 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 25 Sep 2024 19:54:59 +0800 Subject: [PATCH 9/9] Fail all the lookup requests after clearOwnerships() is called --- .../channel/ServiceUnitStateChannelImpl.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 5725249769740..0c274c972bcea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -141,6 +141,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private volatile long lastOwnEventHandledAt = 0; private long lastOwnedServiceUnitCountAt = 0; private int totalOwnedServiceUnitCnt = 0; + private volatile boolean disablePubOwnedEvent = false; public enum EventType { Assign, @@ -255,6 +256,7 @@ public void cancelOwnershipMonitor() { @Override public void cleanOwnerships() { + disablePubOwnedEvent = true; doCleanup(brokerId, true); } @@ -817,6 +819,15 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.dstBroker())) { + if (disablePubOwnedEvent) { + log.info("Skip assigning self({}) as the owner after cleanOwnerships", serviceUnit); + final var getOwnerRequest = getOwnerRequests.remove(serviceUnit); + if (getOwnerRequest != null) { + getOwnerRequest.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( + "lookup during ownership cleanup")); + } + return; + } ServiceUnitStateData next = new ServiceUnitStateData( Owned, data.dstBroker(), data.sourceBroker(), getNextVersionId(data)); stateChangeListeners.notifyOnCompletion(pubAsync(serviceUnit, next), serviceUnit, data) @@ -1439,7 +1450,7 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max System.currentTimeMillis() - started); } - private synchronized void doCleanup(String broker, boolean gracefully) { + private void doCleanup(String broker, boolean gracefully) { try { if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS) .isEmpty()) {