Skip to content

Commit

Permalink
[fix][broker] Fix the broker registering might be blocked for long ti…
Browse files Browse the repository at this point in the history
…me (#23371)

(cherry picked from commit 7d7dc80)
  • Loading branch information
BewareMyPower authored and heesung-sn committed Oct 23, 2024
1 parent d42dacb commit f3008a4
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,6 @@ public BrokerRegistryImpl(PulsarService pulsar) {
this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
// The registered node is an ephemeral node that could be deleted when the metadata store client's session
// is expired. In this case, we should register again.
this.listeners.add((broker, notificationType) -> {
if (notificationType == NotificationType.Deleted && getBrokerId().equals(broker)) {
registerAsync();
}
});
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
this.brokerLookupData = new BrokerLookupData(
pulsar.getWebServiceAddress(),
Expand Down Expand Up @@ -222,11 +215,16 @@ private void handleMetadataStoreNotification(Notification t) {
if (log.isDebugEnabled()) {
log.debug("Handle notification: [{}]", t);
}
// The registered node is an ephemeral node that could be deleted when the metadata store client's session
// is expired. In this case, we should register again.
final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);
if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) {
registerAsync();
}
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
String brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,15 @@ protected void setup() throws Exception {

@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
final var startMs = System.currentTimeMillis();
if (pulsar != null) {
pulsar.close();
}
final var elapsedMs = System.currentTimeMillis() - startMs;
bk.stop();
if (elapsedMs > 5000) {
throw new RuntimeException("Broker took " + elapsedMs + "ms to close");
}
}

@Test
Expand Down Expand Up @@ -105,7 +110,7 @@ public void testRegisterAgain() throws Exception {
});
}

private ServiceConfiguration brokerConfig() {
protected ServiceConfiguration brokerConfig() {
final var config = new ServiceConfiguration();
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateMetadataStoreTableViewImpl;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class BrokerRegistryMetadataStoreIntegrationTest extends BrokerRegistryIntegrationTest {

@Override
protected ServiceConfiguration brokerConfig() {
final var config = super.brokerConfig();
config.setLoadManagerServiceUnitStateTableViewClassName(
ServiceUnitStateMetadataStoreTableViewImpl.class.getName());
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ public void testRefreshAPI(int partition) throws Exception {
TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
.topic(topic)
.create();
// Verify refresh can handle the case when the topic is empty
tv.refreshAsync().get(3, TimeUnit.SECONDS);

// 2. Add a listen action to provide the test environment.
// The listen action will be triggered when there are incoming messages every time.
// This is a sync operation, so sleep in the listen action can slow down the reading rate of messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
Expand Down Expand Up @@ -259,7 +260,11 @@ private void handleMessage(Message<T> msg) {
@Override
public CompletableFuture<Void> refreshAsync() {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
reader.thenCompose(reader -> getLastMessageIds(reader).thenAccept(lastMessageIds -> {
reader.thenCompose(reader -> getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> {
if (lastMessageIds.isEmpty()) {
completableFuture.complete(null);
return;
}
// After get the response of lastMessageIds, put the future and result into `refreshMap`
// and then filter out partitions that has been read to the lastMessageID.
pendingRefreshRequests.put(completableFuture, lastMessageIds);
Expand Down Expand Up @@ -291,22 +296,28 @@ private CompletableFuture<Void> readAllExistingMessages(Reader<T> reader) {
AtomicLong messagesRead = new AtomicLong();

CompletableFuture<Void> future = new CompletableFuture<>();
getLastMessageIds(reader).thenAccept(maxMessageIds -> {
readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);
getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> {
if (lastMessageIds.isEmpty()) {
future.complete(null);
return;
}
readAllExistingMessages(reader, future, startTime, messagesRead, lastMessageIds);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}

private CompletableFuture<Map<String, TopicMessageId>> getLastMessageIds(Reader<T> reader) {
private CompletableFuture<Map<String, TopicMessageId>> getLastMessageIdOfNonEmptyTopics(Reader<T> reader) {
return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> {
Map<String, TopicMessageId> maxMessageIds = new ConcurrentHashMap<>();
Map<String, TopicMessageId> lastMessageIdMap = new ConcurrentHashMap<>();
lastMessageIds.forEach(topicMessageId -> {
maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId);
if (((MessageIdAdv) topicMessageId).getEntryId() >= 0) {
lastMessageIdMap.put(topicMessageId.getOwnerTopic(), topicMessageId);
} // else: a negative entry id represents an empty topic so that we don't have to read messages from it
});
return maxMessageIds;
return lastMessageIdMap;
});
}

Expand Down

0 comments on commit f3008a4

Please sign in to comment.