Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] [fix] [broker] Do not record a bundle-unloading into the topic load failed metrics #23334

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,11 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
if (t instanceof BrokerServiceException.BundleUnloadingException) {
pulsarStats.recordConcurrencyLoadTopicAndUnloadBundle();
} else {
pulsarStats.recordTopicLoadFailed();
}
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});
Expand Down Expand Up @@ -1553,7 +1557,11 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);

topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
if (t instanceof BrokerServiceException.BundleUnloadingException) {
pulsarStats.recordConcurrencyLoadTopicAndUnloadBundle();
} else {
pulsarStats.recordTopicLoadFailed();
}
return null;
});

Expand Down Expand Up @@ -2226,7 +2234,7 @@ public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
+ "Please redo the lookup. Request is denied: namespace=%s",
topic, pulsar.getBrokerId(), topicName.getNamespace());
log.warn(msg);
return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg));
return FutureUtil.failedFuture(new BrokerServiceException.BundleUnloadingException(msg));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public ServiceUnitNotReadyException(String msg, Throwable t) {
}
}

public static class BundleUnloadingException extends ServiceUnitNotReadyException {
public BundleUnloadingException(String msg) {
super(msg);
}

public BundleUnloadingException(String msg, Throwable t) {
super(msg, t);
}
}

public static class TopicClosedException extends BrokerServiceException {
public TopicClosedException(Throwable t) {
super(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ public void recordTopicLoadFailed() {
brokerOperabilityMetrics.recordTopicLoadFailed();
}

public void recordConcurrencyLoadTopicAndUnloadBundle() {
brokerOperabilityMetrics.recordConcurrencyLoadTopicAndUnloadBundle();
}

public void recordConnectionCreate() {
brokerOperabilityMetrics.recordConnectionCreate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
*/
public class BrokerOperabilityMetrics implements AutoCloseable {
private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register();
private static final Counter CONCURRENCY_LOAD_TOPIC_AND_UNLOAD_BUNDLE =
Counter.build("concurrency_load_topic_and_unload_bundle", "-").register();
private final List<Metrics> metricsList;
private final String localCluster;
private final DimensionStats topicLoadStats;
Expand Down Expand Up @@ -130,6 +132,8 @@ Map<String, String> getDimensionMap(String metricsName) {
Metrics getTopicLoadMetrics() {
Metrics metrics = getDimensionMetrics("pulsar_topic_load_times", "topic_load", topicLoadStats);
metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get());
metrics.put("brk_concurrency_load_topic_and_unload_bundle_count",
CONCURRENCY_LOAD_TOPIC_AND_UNLOAD_BUNDLE.get());
return metrics;
}

Expand Down Expand Up @@ -162,6 +166,10 @@ public void recordTopicLoadFailed() {
this.TOPIC_LOAD_FAILED.inc();
}

public void recordConcurrencyLoadTopicAndUnloadBundle() {
this.CONCURRENCY_LOAD_TOPIC_AND_UNLOAD_BUNDLE.inc();
}

public void recordConnectionCreate() {
this.connectionTotalCreatedCount.increment();
this.connectionActive.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -78,6 +79,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -1674,6 +1676,18 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {
admin.topics().createNonPartitionedTopic(topic);
admin.topics().unload(topic);

// Get original counter.
MutableInt failedLoadTopic1 = new MutableInt(0);
MutableInt concurrencyLoadTopicAndUnloadBundle1 = new MutableInt(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since multiple threads are involved, shouldn't these be AtomicIntegers?

Copy link
Member

@lhotari lhotari Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't currently understand the reason to use MutableInts.
If the reason is to that a final variable is needed for an inner lambda, it might be better to use a final variable.
One trick that could be useful is to isolate variable scopes with a {....} block. That would allow to introduce the same variable name multiple times in a single unit test without variable name collisions since the variables would be in different scopes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

JerseyClient httpClient = JerseyClientBuilder.createClient();
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic = parseLongMetric(response, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle =
parseLongMetric(response, "pulsar_concurrency_load_topic_and_unload_bundle_count");
failedLoadTopic1.setValue(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.setValue(concurrencyLoadTopicAndUnloadBundle);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
Expand All @@ -1686,19 +1700,14 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {

// Do test
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().until(() -> {
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
Awaitility.await().untilAsserted(() -> {
String response2 = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
return false;
}
double topic_load_failed_count = 0;
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_topic_load_failed_count")) {
topic_load_failed_count += metric.value;
}
return topic_load_failed_count >= 1D;
long failedLoadTopic2 = parseLongMetric(response2, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle2 =
parseLongMetric(response2, "pulsar_concurrency_load_topic_and_unload_bundle_count");
assertTrue(failedLoadTopic2 > failedLoadTopic1.getValue());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 == concurrencyLoadTopicAndUnloadBundle1.getValue());
});

// Remove the injection.
Expand All @@ -1710,6 +1719,74 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {
admin.namespaces().deleteNamespace(namespace);
}

@Test
public void testMetricsPersistentTopicLoadFailsDueToBundleUnloading() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);
admin.namespaces().unload(namespace);

// Get original counter.
MutableInt failedLoadTopic1 = new MutableInt(0);
MutableInt concurrencyLoadTopicAndUnloadBundle1 = new MutableInt(0);
JerseyClient httpClient = JerseyClientBuilder.createClient();
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic = parseLongMetric(response, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle =
parseLongMetric(response, "pulsar_concurrency_load_topic_and_unload_bundle_count");
failedLoadTopic1.setValue(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.setValue(concurrencyLoadTopicAndUnloadBundle);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
if (failMarker.get() && op.equals(MockZooKeeper.Op.CREATE) &&
path.startsWith("/namespace/" + namespace)) {
return true;
}
return false;
});

// Do test
try {
pulsar.getBrokerService().loadOrCreatePersistentTopic(topic, true, Collections.emptyMap(), null).join();
} catch (Exception ex) {
// ignore, because we injected an error above.
}
Awaitility.await().untilAsserted(() -> {
String response2 = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic2 = parseLongMetric(response2, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle2 =
parseLongMetric(response2, "pulsar_concurrency_load_topic_and_unload_bundle_count");
assertTrue(failedLoadTopic2 == failedLoadTopic1.getValue());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 > concurrencyLoadTopicAndUnloadBundle1.getValue());
});

// Remove the injection.
failMarker.set(false);
// cleanup.
httpClient.close();
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}

private long parseLongMetric(String metricsResponse, String metricName) {
Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(metricsResponse);
if (!metricMap.containsKey(metricName)) {
return 0;
}
double counter = 0;
for (PrometheusMetricsClient.Metric metric :
metricMap.get(metricName)) {
counter += metric.value;
}
return Double.valueOf(counter).longValue();
}

@Test
public void testMetricsNonPersistentTopicLoadFails() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
Expand Down
Loading