Skip to content

Commit

Permalink
[improve][broker] Exclude producers for geo-replication from publishe…
Browse files Browse the repository at this point in the history
…rs field of topic stats (apache#22556)
  • Loading branch information
massakam authored May 7, 2024
1 parent 1e19190 commit c30765e
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand Down Expand Up @@ -746,8 +747,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

replicators.forEach((region, replicator) -> replicator.updateRates());

nsStats.producerCount += producers.size();
bundleStats.producerCount += producers.size();
final MutableInt producerCount = new MutableInt();
topicStatsStream.startObject(topic);

topicStatsStream.startList("publishers");
Expand All @@ -760,14 +760,19 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

if (producer.isRemote()) {
topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}

if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
} else {
// Exclude producers for replication from "publishers" and "producerCount"
producerCount.increment();
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
}
});
topicStatsStream.endList();

nsStats.producerCount += producerCount.intValue();
bundleStats.producerCount += producerCount.intValue();

// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
Expand Down Expand Up @@ -856,7 +861,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
// Remaining dest stats.
topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0
: (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producers.size());
topicStatsStream.writePair("producerCount", producerCount.intValue());
topicStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
Expand Down Expand Up @@ -930,6 +935,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else if (!getStatsOptions.isExcludePublishers()) {
// Exclude producers for replication from "publishers"
stats.addPublisher(publisherStats);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
Expand Down Expand Up @@ -2257,8 +2258,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

replicators.forEach((region, replicator) -> replicator.updateRates());

nsStats.producerCount += producers.size();
bundleStats.producerCount += producers.size();
final MutableInt producerCount = new MutableInt();
topicStatsStream.startObject(topic);

// start publisher stats
Expand All @@ -2272,14 +2272,19 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

if (producer.isRemote()) {
topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}

// Populate consumer specific stats here
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
} else {
// Exclude producers for replication from "publishers" and "producerCount"
producerCount.increment();
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
}
});
topicStatsStream.endList();

nsStats.producerCount += producerCount.intValue();
bundleStats.producerCount += producerCount.intValue();

// if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
// average rate.
lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg
Expand Down Expand Up @@ -2447,7 +2452,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
// Remaining dest stats.
topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0
: (topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producers.size());
topicStatsStream.writePair("producerCount", producerCount.intValue());
topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
Expand Down Expand Up @@ -2535,8 +2540,8 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions

if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}
if (!getStatsOptions.isExcludePublishers()){
} else if (!getStatsOptions.isExcludePublishers()) {
// Exclude producers for replication from "publishers"
stats.addPublisher(publisherStats);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -138,17 +141,49 @@ public void testReplicatorProducerStatInTopic() throws Exception {

// Verify replicator works.
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
Producer<byte[]> producer2 = client2.newProducer().topic(topicName).create(); // Do not publish messages
Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe();
producer1.newMessage().value(msgValue).send();
pulsar1.getBrokerService().checkReplicationPolicies();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue);

// Verify there has one item in the attribute "publishers" or "replications"
// Verify that the "publishers" field does not include the producer for replication
TopicStats topicStats2 = admin2.topics().getStats(topicName);
assertTrue(topicStats2.getPublishers().size() + topicStats2.getReplication().size() > 0);
assertEquals(topicStats2.getPublishers().size(), 1);
assertFalse(topicStats2.getPublishers().get(0).getProducerName().startsWith(config1.getReplicatorPrefix()));

// Update broker stats immediately (usually updated every minute)
pulsar2.getBrokerService().updateRates();
String brokerStats2 = admin2.brokerStats().getTopics();

boolean found = false;
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(brokerStats2);
if (rootNode.hasNonNull(replicatedNamespace)) {
Iterator<JsonNode> bundleNodes = rootNode.get(replicatedNamespace).elements();
while (bundleNodes.hasNext()) {
JsonNode bundleNode = bundleNodes.next();
if (bundleNode.hasNonNull("persistent") && bundleNode.get("persistent").hasNonNull(topicName)) {
found = true;
JsonNode topicNode = bundleNode.get("persistent").get(topicName);
// Verify that the "publishers" field does not include the producer for replication
assertEquals(topicNode.get("publishers").size(), 1);
assertEquals(topicNode.get("producerCount").intValue(), 1);
Iterator<JsonNode> publisherNodes = topicNode.get("publishers").elements();
while (publisherNodes.hasNext()) {
JsonNode publisherNode = publisherNodes.next();
assertFalse(publisherNode.get("producerName").textValue()
.startsWith(config1.getReplicatorPrefix()));
}
break;
}
}
}
assertTrue(found);

// cleanup.
consumer2.close();
consumer2.unsubscribe();
producer2.close();
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
Expand Down

0 comments on commit c30765e

Please sign in to comment.