Skip to content

Commit

Permalink
fix endless node creation
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Nov 26, 2024
1 parent d8366de commit 2690482
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,28 +350,28 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
}

/**
* Merge entries from currentResourceAssignment to newAssignment.
* Merge entries from newAssignment to currentAssignment.
* To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
* could miss out current partition allocation still on offline instances (within delayed window).
* The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
* from newAssignment to currentResourceAssignment
* @param newAssignment newAssignment to merge and may override currentResourceAssignment
* @param currentResourceAssignment the current resource assignment, this map is getting updated during this method.
* from currentAssignment to newAssignment
* @param currentAssignment the current resource assignment, this map is getting updated during this method.
* @param newAssignment currentAssignment to merge and may override newAssignment.
*/
public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
Map<String, ResourceAssignment> currentResourceAssignment) {
newAssignment.entrySet().parallelStream().forEach(entry -> {
public static void mergeAssignments(Map<String, ResourceAssignment> currentAssignment,
Map<String, ResourceAssignment> newAssignment) {
currentAssignment.entrySet().parallelStream().forEach(entry -> {
String resourceName = entry.getKey();
ResourceAssignment assignment = entry.getValue();
if (!currentResourceAssignment.containsKey(resourceName)) {
currentResourceAssignment.put(resourceName, assignment);
if (!newAssignment.containsKey(resourceName)) {
newAssignment.put(resourceName, assignment);
} else {
for (Partition partition : assignment.getMappedPartitions()) {
Map<String, String> toMerge =
new HashMap<>(currentResourceAssignment.get(resourceName).getReplicaMap(partition));
new HashMap<>(newAssignment.get(resourceName).getReplicaMap(partition));
assignment.getReplicaMap(partition).forEach((key, value) -> {
toMerge.put(key, value);
currentResourceAssignment.get(resourceName).addReplicaMap(partition, toMerge);
newAssignment.get(resourceName).addReplicaMap(partition, toMerge);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,8 @@ private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
Map<String, ResourceAssignment> assignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
// keep only the resource entries requiring changes for minActiveReplica
assignment.keySet().retainAll(clusterModel.getAssignableReplicaMap().keySet());
DelayedRebalanceUtil.mergeAssignments(assignment, currentResourceAssignment);
return currentResourceAssignment;
DelayedRebalanceUtil.mergeAssignments(currentResourceAssignment, assignment);
return assignment;
} catch (HelixRebalanceException e) {
LOG.error("Failed to compute for delayed rebalance overwrites in cluster {}", clusterData.getClusterName());
throw e;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package org.apache.helix.integration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestEndlessBestPossibleNodes extends ZkTestBase {

public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster";
public static int PARTICIPANT_COUNT = 12;
public static List<MockParticipantManager> _participants = new ArrayList<>();
public static ClusterControllerManager _controller;
public static ConfigAccessor _configAccessor;

@BeforeClass
public void beforeClass() {
System.out.println("Start test " + TestHelper.getTestClassName());
_gSetupTool.addCluster(CLUSTER_NAME, true);
for (int i = 0; i < PARTICIPANT_COUNT; i++) {
addParticipant("localhost_" + i);
if (i == PARTICIPANT_COUNT-1) {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, "localhost_" + i, false);
}
}

String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();

_configAccessor = new ConfigAccessor(_gZkClient);
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
String testCapacityKey = "TestCapacityKey";
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 10000));
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1));
clusterConfig.setDelayRebalaceEnabled(true);
clusterConfig.setRebalanceDelayTime(57600000);
clusterConfig.setMaxOfflineInstancesAllowed(10);
clusterConfig.setPersistBestPossibleAssignment(true);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
}

// This test was constructed to capture the bug described in issue 2891
// https://github.com/apache/helix/issues/2891
@Test
public void testEndlessBestPossibleNodes() throws Exception {
int numPartition = 10;
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();

// Create 1 WAGED Resource
String firstDB = "InPlaceMigrationTestDB3";
_gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby",
IdealState.RebalanceMode.FULL_AUTO.name(), null);
IdealState idealStateOne =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB);
idealStateOne.setMinActiveReplicas(2);
idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
Assert.assertTrue(verifier.verifyByPolling());

// Disable all instances
for (MockParticipantManager participantManager : _participants) {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, participantManager.getInstanceName(), false);
}

// Add new instances
for (int i = 0; i < 4; i++) {
String instanceName = "newInstance_" + i;
addParticipant(instanceName);
}
// Sleep to let pipeline run and auto enter maintenance mode
Thread.sleep(1000);

// Enable instances to no longer exceed max offline and cluster no longer auto enters maintenance mode
for (int i = 0 ; i < 3; i++) {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participants.get(i).getInstanceName(), true);
}

// Sleep to let pipeline run then disable maintenance mode
Thread.sleep(1000);
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "gspencer", null);

// Sleep to let best possible nodes be created
Thread.sleep(5000);

int childCount = _gZkClient.getChildren("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/BEST_POSSIBLE").size();
Assert.assertTrue(childCount > 0);
Assert.assertTrue(childCount < 5, "Child count was " + childCount);
}

public MockParticipantManager addParticipant(String instanceName) {
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
participant.syncStart();
_participants.add(participant);
return participant;
}
}

0 comments on commit 2690482

Please sign in to comment.