Skip to content

Commit

Permalink
[improve] [pip] PIP-354: apply topK mechanism to ModularLoadManagerIm…
Browse files Browse the repository at this point in the history
…pl (#22753)

Implementation PR for #22765

### Motivation
`ModularLoadManagerImpl` rely on zk to store and synchronize metadata about load, which pose greate pressure on zk, threatening the stability of system. Every broker will upload its `LocalBrokerData` to zk, and leader broker will retrieve all `LocalBrokerData` from zk, generate all `BundleData` from each `LocalBrokerData`, and update all `BundleData` to zk. 

As every bundle in the cluster corresponds to a zk node, it is common that there are thousands of zk nodes in a cluster, which results into thousands of read/update operations to zk. This will cause a lot of pressure on zk.

**As All Load Shedding Algorithm pick bundles from top to bottom based on throughput/msgRate, bundles with low throughput/msgRate are rarely be selected for shedding. So that we don't need to contain these bundles in the bundle load report.**


### Modifications

Reuse the configuration loadBalancerMaxNumberOfBundlesInBundleLoadReport in ExtensibleLoadManager, apply the topK mechanism to ModularLoadManagerImpl.
  • Loading branch information
thetumbled authored May 28, 2024
1 parent c25d7b2 commit f5a00d8
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 10 deletions.
5 changes: 4 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,10 @@ loadBalancerBrokerLoadDataTTLInSeconds=1800
# The load balancer distributes bundles across brokers,
# based on topK bundle load data and other broker load data.
# The bigger value will increase the overhead of reporting many bundles in load data.
# (only used in load balancer extension logics)
# Used for ExtensibleLoadManagerImpl and ModularLoadManagerImpl, default value is 10.
# User can disable the bundle filtering feature of ModularLoadManagerImpl by setting this value to -1.
# Enabling this feature can reduce the pressure on the zookeeper when doing load report.
# WARNING: too small value could result in a long load balance time.
loadBalancerMaxNumberOfBundlesInBundleLoadReport=10

# Service units'(bundles) split interval. Broker periodically checks whether
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2662,7 +2662,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ "The load balancer distributes bundles across brokers, "
+ "based on topK bundle load data and other broker load data."
+ "The bigger value will increase the overhead of reporting many bundles in load data. "
+ "(only used in load balancer extension logics)"
+ "Used for ExtensibleLoadManagerImpl and ModularLoadManagerImpl, default value is 10. "
+ "User can disable the bundle filtering feature of ModularLoadManagerImpl by setting to -1."
+ "Enabling this feature can reduce the pressure on the zookeeper when doing load report."
+ "WARNING: too small value could result in a long load balance time."
)
private int loadBalancerMaxNumberOfBundlesInBundleLoadReport = 10;
@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void update(Map<String, NamespaceBundleStats> bundleStats, int topk) {
}
}

static void partitionSort(List<Map.Entry<String, ? extends Comparable>> arr, int k) {
public static void partitionSort(List<Map.Entry<String, ? extends Comparable>> arr, int k) {
int start = 0;
int end = arr.size() - 1;
int target = k - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -187,6 +188,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private final Lock lock = new ReentrantLock();
private final Set<String> knownBrokers = new HashSet<>();
private Map<String, String> bundleBrokerAffinityMap;
// array used for sorting and select topK bundles
private final List<Map.Entry<String, ? extends Comparable>> bundleArr = new ArrayList<>();


/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
Expand Down Expand Up @@ -1122,6 +1126,32 @@ public void writeBrokerDataOnZooKeeper(boolean force) {
}
}

/**
* sort bundles by load and select topK bundles for each broker.
* @return the number of bundles selected
*/
private int selectTopKBundle() {
bundleArr.clear();
bundleArr.addAll(loadData.getBundleData().entrySet());

int maxNumberOfBundlesInBundleLoadReport = pulsar.getConfiguration()
.getLoadBalancerMaxNumberOfBundlesInBundleLoadReport();
if (maxNumberOfBundlesInBundleLoadReport <= 0) {
// select all bundle
return bundleArr.size();
} else {
// select topK bundle for each broker, so select topK * brokerCount bundle in total
int brokerCount = Math.max(1, loadData.getBrokerData().size());
int updateBundleCount = Math.min(maxNumberOfBundlesInBundleLoadReport * brokerCount, bundleArr.size());
if (updateBundleCount == 0) {
// no bundle to update
return 0;
}
TopKBundles.partitionSort(bundleArr, updateBundleCount);
return updateBundleCount;
}
}

/**
* As the leader broker, write bundle data aggregated from all brokers to metadata store.
*/
Expand All @@ -1131,11 +1161,12 @@ public void writeBundleDataOnZooKeeper() {
// Write the bundle data to metadata store.
List<CompletableFuture<Void>> futures = new ArrayList<>();

for (Map.Entry<String, BundleData> entry : loadData.getBundleData().entrySet()) {
final String bundle = entry.getKey();
final BundleData data = entry.getValue();
futures.add(
pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData(bundle, data));
// use synchronized to protect bundleArr.
synchronized (bundleArr) {
int updateBundleCount = selectTopKBundle();
bundleArr.stream().limit(updateBundleCount).forEach(entry -> futures.add(
pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData(
entry.getKey(), (BundleData) entry.getValue())));
}

// Write the time average broker data to metadata store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.lang.reflect.Method;
import java.net.URL;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -425,6 +426,61 @@ public void testMaxTopicDistributionToBroker() throws Exception {
}
}

/**
* It verifies that the load-manager of leader broker only write topK * brokerCount bundles to zk.
*/
@Test
public void testFilterBundlesWhileWritingToMetadataStore() throws Exception {
Map<String, PulsarService> pulsarServices = new HashMap<>();
pulsarServices.put(pulsar1.getWebServiceAddress(), pulsar1);
pulsarServices.put(pulsar2.getWebServiceAddress(), pulsar2);
MetadataCache<BundleData> metadataCache = pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class);
String protocol = "http://";
PulsarService leaderBroker = pulsarServices.get(protocol + pulsar1.getLeaderElectionService().getCurrentLeader().get().getBrokerId());
ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) getField(
leaderBroker.getLoadManager().get(), "loadManager");
int topK = 1;
leaderBroker.getConfiguration().setLoadBalancerMaxNumberOfBundlesInBundleLoadReport(topK);
// there are two broker in cluster, so total bundle count will be topK * 2
int exportBundleCount = topK * 2;

// create and configure bundle-data
final int totalBundles = 5;
final NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles(
nsFactory, "test", "test", "test", totalBundles);
LoadData loadData = (LoadData) getField(loadManager, "loadData");
for (int i = 0; i < totalBundles; i++) {
final BundleData bundleData = new BundleData(10, 1000);
final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]);
final TimeAverageMessageData longTermMessageData = new TimeAverageMessageData(1000);
longTermMessageData.setMsgThroughputIn(1000 * i);
longTermMessageData.setMsgThroughputOut(1000 * i);
longTermMessageData.setMsgRateIn(1000 * i);
longTermMessageData.setNumSamples(1000);
bundleData.setLongTermData(longTermMessageData);
loadData.getBundleData().put(bundles[i].toString(), bundleData);
loadData.getBrokerData().get(leaderBroker.getWebServiceAddress().substring(protocol.length()))
.getLocalData().getLastStats().put(bundles[i].toString(), new NamespaceBundleStats());
metadataCache.create(bundleDataPath, bundleData).join();
}
for (int i = 0; i < totalBundles; i++) {
final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]);
assertEquals(metadataCache.getWithStats(bundleDataPath).get().get().getStat().getVersion(), 0);
}

// update bundle data to zk and verify
loadManager.writeBundleDataOnZooKeeper();
int filterBundleCount = totalBundles - exportBundleCount;
for (int i = 0; i < filterBundleCount; i++) {
final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]);
assertEquals(metadataCache.getWithStats(bundleDataPath).get().get().getStat().getVersion(), 0);
}
for (int i = filterBundleCount; i < totalBundles; i++) {
final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]);
assertEquals(metadataCache.getWithStats(bundleDataPath).get().get().getStat().getVersion(), 1);
}
}

// Test that load shedding works
@Test
public void testLoadShedding() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;

import lombok.EqualsAndHashCode;

/**
* Data class comprising the short term and long term historical data for this bundle.
*/
public class BundleData {
@EqualsAndHashCode
public class BundleData implements Comparable<BundleData> {
// Short term data for this bundle. The time frame of this data is
// determined by the number of short term samples
// and the bundle update period.
Expand Down Expand Up @@ -103,4 +106,13 @@ public int getTopics() {
public void setTopics(int topics) {
this.topics = topics;
}

@Override
public int compareTo(BundleData o) {
int result = this.shortTermData.compareTo(o.shortTermData);
if (result == 0) {
result = this.longTermData.compareTo(o.longTermData);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.policies.data.loadbalancer;

import lombok.EqualsAndHashCode;

/**
* Data class comprising the average message data over a fixed period of time.
*/
public class TimeAverageMessageData {
@EqualsAndHashCode
public class TimeAverageMessageData implements Comparable<TimeAverageMessageData> {
// The maximum number of samples this data will consider.
private int maxSamples;

Expand All @@ -41,6 +44,11 @@ public class TimeAverageMessageData {
// The average message rate out per second.
private double msgRateOut;

// Consider the throughput equal if difference is less than 100 KB/s
private static final double throughputDifferenceThreshold = 1e5;
// Consider the msgRate equal if the difference is less than 100
private static final double msgRateDifferenceThreshold = 100;

// For JSON only.
public TimeAverageMessageData() {
}
Expand Down Expand Up @@ -177,4 +185,40 @@ public double totalMsgRate() {
public double totalMsgThroughput() {
return msgThroughputIn + msgThroughputOut;
}

@Override
public int compareTo(TimeAverageMessageData other) {
int result = this.compareByBandwidthIn(other);

if (result == 0) {
result = this.compareByBandwidthOut(other);
}
if (result == 0) {
result = this.compareByMsgRate(other);
}
return result;
}

public int compareByMsgRate(TimeAverageMessageData other) {
double thisMsgRate = this.msgRateIn + this.msgRateOut;
double otherMsgRate = other.msgRateIn + other.msgRateOut;
if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold) {
return Double.compare(thisMsgRate, otherMsgRate);
}
return 0;
}

public int compareByBandwidthIn(TimeAverageMessageData other) {
if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) {
return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
}
return 0;
}

public int compareByBandwidthOut(TimeAverageMessageData other) {
if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) > throughputDifferenceThreshold) {
return Double.compare(this.msgThroughputOut, other.msgThroughputOut);
}
return 0;
}
}

0 comments on commit f5a00d8

Please sign in to comment.