Skip to content

Commit

Permalink
Add integration tests and fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
junkaixue committed Feb 13, 2024
1 parent 332d17a commit ebc4ae2
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
Expand All @@ -38,6 +39,7 @@
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
import org.apache.helix.controller.stages.MissingTopStateRecord;
Expand Down Expand Up @@ -177,6 +179,14 @@ public synchronized void refresh(HelixDataAccessor accessor) {

if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
// Remove all cached IdealState because it is a global computation cannot partially be
// performed for some resources. The computation is simple as well not taking too much resource
// to recompute the assignments.
Set<String> cachedGreedyIdealStates = _idealMappingCache.values().stream().filter(
record -> record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name())
.equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
.collect(Collectors.toSet());
_idealMappingCache.keySet().removeAll(cachedGreedyIdealStates);
}

LogUtil.logInfo(logger, getClusterEventId(), String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,27 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
return znRecord;
}

// Sort the assignable nodes by id
List<CapacityNode> assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet());
Collections.sort(assignableNodes, Comparator.comparing(CapacityNode::getId));

// Assign partitions to node by order.
for (int i = 0, index = 0; i < _partitions.size(); i++) {
int startIndex = index;
List<String> preferenceList = new ArrayList<>();
for (int j = 0; j < numReplicas; j++) {
if (index - startIndex >= assignableNodes.size()) {
logger.warn("No enough assignable nodes for resource: " + _resourceName);
break;
}
while (index - startIndex < assignableNodes.size()) {
CapacityNode node = assignableNodes.get(index++ % assignableNodes.size());
if (node.canAdd(_resourceName, _partitions.get(i))) {
preferenceList.add(node.getId());
break;
}
}

if (index - startIndex >= assignableNodes.size()) {
// If the all nodes have been tried out, then no node can be assigned.
logger.warn("No enough assignable nodes for resource: " + _resourceName);
}
}
znRecord.setListField(_partitions.get(i), preferenceList);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.apache.helix.integration.rebalancer;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends TaskTestBase {

@BeforeClass
public void beforeClass() throws Exception {
_numNodes = 10;
_numReplicas = 2;
_numDbs = 1;
_numPartitions = 4;
super.beforeClass();
}

@AfterClass
public void afterClass() throws Exception {
/*
* shutdown order: 1) disconnect the controller 2) disconnect participants
*/
_controller.syncStop();
for (MockParticipantManager participant : _participants) {
participant.syncStop();
}
deleteCluster(CLUSTER_NAME);
System.out.println("END " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testGreedyRebalanceWithGlobalPerInstancePartitionLimit() throws InterruptedException {
// Update cluster config and greedy rebalance strategy
ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
clusterConfig.setGlobalMaxPartitionAllowedPerInstance(1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
IdealState idealState = _gSetupTool.getClusterManagementTool()
.getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
idealState.setRebalanceStrategy(
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
_gSetupTool.getClusterManagementTool()
.setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState);
Assert.assertTrue(_clusterVerifier.verifyByPolling());

_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "NewDB", 2, "OnlineOffline",
IdealState.RebalanceMode.FULL_AUTO.name(),
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
_gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, "NewDB", 1);
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// Process instance -> number of assigned partitions
ExternalView TGTDBView = _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
ExternalView newDBView =
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, "NewDB");
Map<String, Integer> instancePartitionCountMap = new HashMap<>();
TGTDBView.getPartitionSet().stream()
.forEach(partition -> TGTDBView.getStateMap(partition).keySet().forEach(instance -> {
instancePartitionCountMap.put(instance,
instancePartitionCountMap.getOrDefault(instance, 0) + 1);
}));
newDBView.getPartitionSet().stream()
.forEach(partition -> newDBView.getStateMap(partition).keySet().forEach(instance -> {
instancePartitionCountMap.put(instance,
instancePartitionCountMap.getOrDefault(instance, 0) + 1);
}));

Assert.assertEquals(
instancePartitionCountMap.values().stream().filter(count -> count != 1).count(), 0);
}
}

0 comments on commit ebc4ae2

Please sign in to comment.