Skip to content

Commit

Permalink
[improve][broker] Optimize and clean up aggregation of topic stats (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Oct 14, 2023
1 parent 421c98a commit d6a56ad
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
Objects.requireNonNull(stats);
super.add(stats);
this.msgDropRate += stats.msgDropRate;
for (int index = 0; index < stats.getNonPersistentPublishers().size(); index++) {
NonPersistentPublisherStats s = stats.getNonPersistentPublishers().get(index);
List<NonPersistentPublisherStats> publisherStats = stats.getNonPersistentPublishers();
for (int index = 0; index < publisherStats.size(); index++) {
NonPersistentPublisherStats s = publisherStats.get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
((NonPersistentPublisherStatsImpl) this.nonPersistentPublishersMap
.computeIfAbsent(s.getProducerName(), key -> {
Expand All @@ -181,46 +182,24 @@ public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
}
}

if (this.getNonPersistentSubscriptions().size() != stats.getNonPersistentSubscriptions().size()) {
for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
NonPersistentSubscriptionStatsImpl subscriptionStats = new NonPersistentSubscriptionStatsImpl();
this.getNonPersistentSubscriptions().put(subscription, subscriptionStats
.add((NonPersistentSubscriptionStatsImpl)
stats.getNonPersistentSubscriptions().get(subscription)));
}
} else {
for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
if (this.getNonPersistentSubscriptions().get(subscription) != null) {
((NonPersistentSubscriptionStatsImpl) this.getNonPersistentSubscriptions().get(subscription))
.add((NonPersistentSubscriptionStatsImpl)
stats.getNonPersistentSubscriptions().get(subscription));
} else {
NonPersistentSubscriptionStatsImpl subscriptionStats = new NonPersistentSubscriptionStatsImpl();
this.getNonPersistentSubscriptions().put(subscription, subscriptionStats
.add((NonPersistentSubscriptionStatsImpl)
stats.getNonPersistentSubscriptions().get(subscription)));
}
}
for (Map.Entry<String, NonPersistentSubscriptionStats> entry : stats.getNonPersistentSubscriptions()
.entrySet()) {
NonPersistentSubscriptionStatsImpl subscriptionStats =
(NonPersistentSubscriptionStatsImpl) this.getNonPersistentSubscriptions()
.computeIfAbsent(entry.getKey(), k -> new NonPersistentSubscriptionStatsImpl());
subscriptionStats.add(
(NonPersistentSubscriptionStatsImpl) entry.getValue());
}

if (this.getNonPersistentReplicators().size() != stats.getNonPersistentReplicators().size()) {
for (String repl : stats.getNonPersistentReplicators().keySet()) {
NonPersistentReplicatorStatsImpl replStats = new NonPersistentReplicatorStatsImpl();
this.getNonPersistentReplicators().put(repl, replStats
.add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl)));
}
} else {
for (String repl : stats.getNonPersistentReplicators().keySet()) {
if (this.getNonPersistentReplicators().get(repl) != null) {
((NonPersistentReplicatorStatsImpl) this.getNonPersistentReplicators().get(repl))
.add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl));
} else {
NonPersistentReplicatorStatsImpl replStats = new NonPersistentReplicatorStatsImpl();
this.getNonPersistentReplicators().put(repl, replStats
.add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl)));
}
}
for (Map.Entry<String, NonPersistentReplicatorStats> entry : stats.getNonPersistentReplicators().entrySet()) {
NonPersistentReplicatorStatsImpl replStats = (NonPersistentReplicatorStatsImpl)
this.getNonPersistentReplicators().computeIfAbsent(entry.getKey(), k -> {
NonPersistentReplicatorStatsImpl r = new NonPersistentReplicatorStatsImpl();
return r;
});
replStats.add((NonPersistentReplicatorStatsImpl) entry.getValue());
}

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ public TopicStatsImpl add(TopicStats ts) {
topicMetricBean.value += v.value;
});

for (int index = 0; index < stats.getPublishers().size(); index++) {
PublisherStats s = stats.getPublishers().get(index);
List<? extends PublisherStats> publisherStats = stats.getPublishers();
for (int index = 0; index < publisherStats.size(); index++) {
PublisherStats s = publisherStats.get(index);
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
this.publishersMap.computeIfAbsent(s.getProducerName(), key -> {
final PublisherStatsImpl newStats = new PublisherStatsImpl();
Expand All @@ -284,38 +285,22 @@ public TopicStatsImpl add(TopicStats ts) {
}
}

if (this.subscriptions.size() != stats.subscriptions.size()) {
for (String subscription : stats.subscriptions.keySet()) {
SubscriptionStatsImpl subscriptionStats = new SubscriptionStatsImpl();
this.subscriptions.put(subscription, subscriptionStats.add(stats.subscriptions.get(subscription)));
}
} else {
for (String subscription : stats.subscriptions.keySet()) {
if (this.subscriptions.get(subscription) != null) {
this.subscriptions.get(subscription).add(stats.subscriptions.get(subscription));
} else {
SubscriptionStatsImpl subscriptionStats = new SubscriptionStatsImpl();
this.subscriptions.put(subscription, subscriptionStats.add(stats.subscriptions.get(subscription)));
}
}
for (Map.Entry<String, SubscriptionStatsImpl> entry : stats.subscriptions.entrySet()) {
SubscriptionStatsImpl subscriptionStats =
this.subscriptions.computeIfAbsent(entry.getKey(), k -> new SubscriptionStatsImpl());
subscriptionStats.add(entry.getValue());
}
if (this.replication.size() != stats.replication.size()) {
for (String repl : stats.replication.keySet()) {
ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
replStats.setConnected(true);
this.replication.put(repl, replStats.add(stats.replication.get(repl)));
}
} else {
for (String repl : stats.replication.keySet()) {
if (this.replication.get(repl) != null) {
this.replication.get(repl).add(stats.replication.get(repl));
} else {
ReplicatorStatsImpl replStats = new ReplicatorStatsImpl();
replStats.setConnected(true);
this.replication.put(repl, replStats.add(stats.replication.get(repl)));
}
}

for (Map.Entry<String, ReplicatorStatsImpl> entry : stats.replication.entrySet()) {
ReplicatorStatsImpl replStats =
this.replication.computeIfAbsent(entry.getKey(), k -> {
ReplicatorStatsImpl r = new ReplicatorStatsImpl();
r.setConnected(true);
return r;
});
replStats.add(entry.getValue());
}

if (earliestMsgPublishTimeInBacklogs != 0 && ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs != 0) {
earliestMsgPublishTimeInBacklogs = Math.min(
earliestMsgPublishTimeInBacklogs,
Expand Down

0 comments on commit d6a56ad

Please sign in to comment.