diff --git a/conf/broker.conf b/conf/broker.conf index f1ec8e7a09f89..d68b6c6ca61de 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1470,7 +1470,10 @@ loadBalancerBrokerLoadDataTTLInSeconds=1800 # The load balancer distributes bundles across brokers, # based on topK bundle load data and other broker load data. # The bigger value will increase the overhead of reporting many bundles in load data. -# (only used in load balancer extension logics) +# Used for ExtensibleLoadManagerImpl and ModularLoadManagerImpl, default value is 10. +# User can disable the bundle filtering feature of ModularLoadManagerImpl by setting this value to -1. +# Enabling this feature can reduce the pressure on the zookeeper when doing load report. +# WARNING: too small value could result in a long load balance time. loadBalancerMaxNumberOfBundlesInBundleLoadReport=10 # Service units'(bundles) split interval. Broker periodically checks whether diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c18ffe4bc1886..6e8820db27ca7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2662,7 +2662,10 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + "The load balancer distributes bundles across brokers, " + "based on topK bundle load data and other broker load data." + "The bigger value will increase the overhead of reporting many bundles in load data. " - + "(only used in load balancer extension logics)" + + "Used for ExtensibleLoadManagerImpl and ModularLoadManagerImpl, default value is 10. " + + "User can disable the bundle filtering feature of ModularLoadManagerImpl by setting to -1." + + "Enabling this feature can reduce the pressure on the zookeeper when doing load report." + + "WARNING: too small value could result in a long load balance time." ) private int loadBalancerMaxNumberOfBundlesInBundleLoadReport = 10; @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java index 624546fdff837..ec26521af41f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java @@ -99,7 +99,7 @@ public void update(Map bundleStats, int topk) { } } - static void partitionSort(List> arr, int k) { + public static void partitionSort(List> arr, int k) { int start = 0; int end = arr.size() - 1; int target = k - 1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 974d75d60b203..a3e6b1c3aebd3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -57,6 +57,7 @@ import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy; import org.apache.pulsar.broker.loadbalance.ModularLoadManager; import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy; +import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -187,6 +188,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager { private final Lock lock = new ReentrantLock(); private final Set knownBrokers = new HashSet<>(); private Map bundleBrokerAffinityMap; + // array used for sorting and select topK bundles + private final List> bundleArr = new ArrayList<>(); + /** * Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called. @@ -1122,6 +1126,32 @@ public void writeBrokerDataOnZooKeeper(boolean force) { } } + /** + * sort bundles by load and select topK bundles for each broker. + * @return the number of bundles selected + */ + private int selectTopKBundle() { + bundleArr.clear(); + bundleArr.addAll(loadData.getBundleData().entrySet()); + + int maxNumberOfBundlesInBundleLoadReport = pulsar.getConfiguration() + .getLoadBalancerMaxNumberOfBundlesInBundleLoadReport(); + if (maxNumberOfBundlesInBundleLoadReport <= 0) { + // select all bundle + return bundleArr.size(); + } else { + // select topK bundle for each broker, so select topK * brokerCount bundle in total + int brokerCount = Math.max(1, loadData.getBrokerData().size()); + int updateBundleCount = Math.min(maxNumberOfBundlesInBundleLoadReport * brokerCount, bundleArr.size()); + if (updateBundleCount == 0) { + // no bundle to update + return 0; + } + TopKBundles.partitionSort(bundleArr, updateBundleCount); + return updateBundleCount; + } + } + /** * As the leader broker, write bundle data aggregated from all brokers to metadata store. */ @@ -1131,11 +1161,12 @@ public void writeBundleDataOnZooKeeper() { // Write the bundle data to metadata store. List> futures = new ArrayList<>(); - for (Map.Entry entry : loadData.getBundleData().entrySet()) { - final String bundle = entry.getKey(); - final BundleData data = entry.getValue(); - futures.add( - pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData(bundle, data)); + // use synchronized to protect bundleArr. + synchronized (bundleArr) { + int updateBundleCount = selectTopKBundle(); + bundleArr.stream().limit(updateBundleCount).forEach(entry -> futures.add( + pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData( + entry.getKey(), (BundleData) entry.getValue()))); } // Write the time average broker data to metadata store. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index 1f9cd806e19b5..20a33a70bfa40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -40,6 +40,7 @@ import java.lang.reflect.Method; import java.net.URL; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -425,6 +426,61 @@ public void testMaxTopicDistributionToBroker() throws Exception { } } + /** + * It verifies that the load-manager of leader broker only write topK * brokerCount bundles to zk. + */ + @Test + public void testFilterBundlesWhileWritingToMetadataStore() throws Exception { + Map pulsarServices = new HashMap<>(); + pulsarServices.put(pulsar1.getWebServiceAddress(), pulsar1); + pulsarServices.put(pulsar2.getWebServiceAddress(), pulsar2); + MetadataCache metadataCache = pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class); + String protocol = "http://"; + PulsarService leaderBroker = pulsarServices.get(protocol + pulsar1.getLeaderElectionService().getCurrentLeader().get().getBrokerId()); + ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) getField( + leaderBroker.getLoadManager().get(), "loadManager"); + int topK = 1; + leaderBroker.getConfiguration().setLoadBalancerMaxNumberOfBundlesInBundleLoadReport(topK); + // there are two broker in cluster, so total bundle count will be topK * 2 + int exportBundleCount = topK * 2; + + // create and configure bundle-data + final int totalBundles = 5; + final NamespaceBundle[] bundles = LoadBalancerTestingUtils.makeBundles( + nsFactory, "test", "test", "test", totalBundles); + LoadData loadData = (LoadData) getField(loadManager, "loadData"); + for (int i = 0; i < totalBundles; i++) { + final BundleData bundleData = new BundleData(10, 1000); + final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]); + final TimeAverageMessageData longTermMessageData = new TimeAverageMessageData(1000); + longTermMessageData.setMsgThroughputIn(1000 * i); + longTermMessageData.setMsgThroughputOut(1000 * i); + longTermMessageData.setMsgRateIn(1000 * i); + longTermMessageData.setNumSamples(1000); + bundleData.setLongTermData(longTermMessageData); + loadData.getBundleData().put(bundles[i].toString(), bundleData); + loadData.getBrokerData().get(leaderBroker.getWebServiceAddress().substring(protocol.length())) + .getLocalData().getLastStats().put(bundles[i].toString(), new NamespaceBundleStats()); + metadataCache.create(bundleDataPath, bundleData).join(); + } + for (int i = 0; i < totalBundles; i++) { + final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]); + assertEquals(metadataCache.getWithStats(bundleDataPath).get().get().getStat().getVersion(), 0); + } + + // update bundle data to zk and verify + loadManager.writeBundleDataOnZooKeeper(); + int filterBundleCount = totalBundles - exportBundleCount; + for (int i = 0; i < filterBundleCount; i++) { + final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]); + assertEquals(metadataCache.getWithStats(bundleDataPath).get().get().getStat().getVersion(), 0); + } + for (int i = filterBundleCount; i < totalBundles; i++) { + final String bundleDataPath = String.format("%s/%s", BUNDLE_DATA_BASE_PATH, bundles[i]); + assertEquals(metadataCache.getWithStats(bundleDataPath).get().get().getStat().getVersion(), 1); + } + } + // Test that load shedding works @Test public void testLoadShedding() throws Exception { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/BundleData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/BundleData.java index e5e32046e4970..3c03b7b79bc07 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/BundleData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/BundleData.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.policies.data.loadbalancer; +import lombok.EqualsAndHashCode; + /** * Data class comprising the short term and long term historical data for this bundle. */ -public class BundleData { +@EqualsAndHashCode +public class BundleData implements Comparable { // Short term data for this bundle. The time frame of this data is // determined by the number of short term samples // and the bundle update period. @@ -103,4 +106,13 @@ public int getTopics() { public void setTopics(int topics) { this.topics = topics; } + + @Override + public int compareTo(BundleData o) { + int result = this.shortTermData.compareTo(o.shortTermData); + if (result == 0) { + result = this.longTermData.compareTo(o.longTermData); + } + return result; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java index 777a6684ce81e..b9c7a43c3a7a0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.policies.data.loadbalancer; +import lombok.EqualsAndHashCode; + /** * Data class comprising the average message data over a fixed period of time. */ -public class TimeAverageMessageData { +@EqualsAndHashCode +public class TimeAverageMessageData implements Comparable { // The maximum number of samples this data will consider. private int maxSamples; @@ -41,6 +44,11 @@ public class TimeAverageMessageData { // The average message rate out per second. private double msgRateOut; + // Consider the throughput equal if difference is less than 100 KB/s + private static final double throughputDifferenceThreshold = 1e5; + // Consider the msgRate equal if the difference is less than 100 + private static final double msgRateDifferenceThreshold = 100; + // For JSON only. public TimeAverageMessageData() { } @@ -177,4 +185,40 @@ public double totalMsgRate() { public double totalMsgThroughput() { return msgThroughputIn + msgThroughputOut; } + + @Override + public int compareTo(TimeAverageMessageData other) { + int result = this.compareByBandwidthIn(other); + + if (result == 0) { + result = this.compareByBandwidthOut(other); + } + if (result == 0) { + result = this.compareByMsgRate(other); + } + return result; + } + + public int compareByMsgRate(TimeAverageMessageData other) { + double thisMsgRate = this.msgRateIn + this.msgRateOut; + double otherMsgRate = other.msgRateIn + other.msgRateOut; + if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold) { + return Double.compare(thisMsgRate, otherMsgRate); + } + return 0; + } + + public int compareByBandwidthIn(TimeAverageMessageData other) { + if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) { + return Double.compare(this.msgThroughputIn, other.msgThroughputIn); + } + return 0; + } + + public int compareByBandwidthOut(TimeAverageMessageData other) { + if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) > throughputDifferenceThreshold) { + return Double.compare(this.msgThroughputOut, other.msgThroughputOut); + } + return 0; + } }