Skip to content

Commit

Permalink
[fix][broker] Fix resource_quota_zpath (apache#21461)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy authored Nov 16, 2023
1 parent 2322004 commit 403faa4
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
public static final int NUM_SHORT_SAMPLES = 10;

// Path to ZNode whose children contain ResourceQuota jsons.
public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";
public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota";

// Set of broker candidates to reuse so that object creation is avoided.
private final Set<String> brokerCandidateCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand All @@ -99,6 +101,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -785,6 +788,55 @@ public void testRemoveDeadBrokerTimeAverageData() throws Exception {
assertEquals(data.size(), 1);
}

@DataProvider(name = "isV1")
public Object[][] isV1() {
return new Object[][] {{true}, {false}};
}

@Test(dataProvider = "isV1")
public void testBundleDataDefaultValue(boolean isV1) throws Exception {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "my-ns";
NamespaceName ns = isV1 ? NamespaceName.get(tenant, cluster, namespace) : NamespaceName.get(tenant, namespace);
admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(ns.toString(), 16);

// set resourceQuota to the first bundle range.
BundlesData bundlesData = admin1.namespaces().getBundles(ns.toString());
NamespaceBundle namespaceBundle = nsFactory.getBundle(ns,
Range.range(Long.decode(bundlesData.getBoundaries().get(0)), BoundType.CLOSED, Long.decode(bundlesData.getBoundaries().get(1)),
BoundType.OPEN));
ResourceQuota quota = new ResourceQuota();
quota.setMsgRateIn(1024.1);
quota.setMsgRateOut(1024.2);
quota.setBandwidthIn(1024.3);
quota.setBandwidthOut(1024.4);
quota.setMemory(1024.0);
admin1.resourceQuotas().setNamespaceBundleResourceQuota(ns.toString(), namespaceBundle.getBundleRange(), quota);

ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager();

// get the bundleData of the first bundle range.
// The default value of the bundleData be the same as resourceQuota because the resourceQuota is present.
BundleData defaultBundleData = lm.getBundleDataOrDefault(namespaceBundle.toString());

TimeAverageMessageData shortTermData = defaultBundleData.getShortTermData();
TimeAverageMessageData longTermData = defaultBundleData.getLongTermData();
assertEquals(shortTermData.getMsgRateIn(), 1024.1);
assertEquals(shortTermData.getMsgRateOut(), 1024.2);
assertEquals(shortTermData.getMsgThroughputIn(), 1024.3);
assertEquals(shortTermData.getMsgThroughputOut(), 1024.4);

assertEquals(longTermData.getMsgRateIn(), 1024.1);
assertEquals(longTermData.getMsgRateOut(), 1024.2);
assertEquals(longTermData.getMsgThroughputIn(), 1024.3);
assertEquals(longTermData.getMsgThroughputOut(), 1024.4);
}


@Test
public void testRemoveNonExistBundleData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*/
public class LoadSimulationController {
private static final Logger log = LoggerFactory.getLogger(LoadSimulationController.class);
private static final String QUOTA_ROOT = "/loadbalance/resource-quota/namespace";
private static final String QUOTA_ROOT = "/loadbalance/resource-quota";

// Input streams for each client to send commands through.
private final DataInputStream[] inputStreams;
Expand Down

0 comments on commit 403faa4

Please sign in to comment.