diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java index d45aa971bb..46247c98bc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java @@ -350,28 +350,28 @@ public static Set findToBeAssignedReplicasForMinActiveReplica } /** - * Merge entries from currentResourceAssignment to newAssignment. + * Merge entries from newAssignment to currentAssignment. * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but * could miss out current partition allocation still on offline instances (within delayed window). * The merge process is independent for each resource; for each resource-partition, it adds the pair - * from newAssignment to currentResourceAssignment - * @param newAssignment newAssignment to merge and may override currentResourceAssignment - * @param currentResourceAssignment the current resource assignment, this map is getting updated during this method. + * from currentAssignment to newAssignment + * @param currentAssignment the current resource assignment, this map is getting updated during this method. + * @param newAssignment currentAssignment to merge and may override newAssignment. */ - public static void mergeAssignments(Map newAssignment, - Map currentResourceAssignment) { - newAssignment.entrySet().parallelStream().forEach(entry -> { + public static void mergeAssignments(Map currentAssignment, + Map newAssignment) { + currentAssignment.entrySet().parallelStream().forEach(entry -> { String resourceName = entry.getKey(); ResourceAssignment assignment = entry.getValue(); - if (!currentResourceAssignment.containsKey(resourceName)) { - currentResourceAssignment.put(resourceName, assignment); + if (!newAssignment.containsKey(resourceName)) { + newAssignment.put(resourceName, assignment); } else { for (Partition partition : assignment.getMappedPartitions()) { Map toMerge = - new HashMap<>(currentResourceAssignment.get(resourceName).getReplicaMap(partition)); + new HashMap<>(newAssignment.get(resourceName).getReplicaMap(partition)); assignment.getReplicaMap(partition).forEach((key, value) -> { toMerge.put(key, value); - currentResourceAssignment.get(resourceName).addReplicaMap(partition, toMerge); + newAssignment.get(resourceName).addReplicaMap(partition, toMerge); }); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 39a197bff5..c1920ba5d0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -417,8 +417,8 @@ private Map handleDelayedRebalanceMinActiveReplica( Map assignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm); // keep only the resource entries requiring changes for minActiveReplica assignment.keySet().retainAll(clusterModel.getAssignableReplicaMap().keySet()); - DelayedRebalanceUtil.mergeAssignments(assignment, currentResourceAssignment); - return currentResourceAssignment; + DelayedRebalanceUtil.mergeAssignments(currentResourceAssignment, assignment); + return assignment; } catch (HelixRebalanceException e) { LOG.error("Failed to compute for delayed rebalance overwrites in cluster {}", clusterData.getClusterName()); throw e; diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEndlessBestPossibleNodes.java b/helix-core/src/test/java/org/apache/helix/integration/TestEndlessBestPossibleNodes.java new file mode 100644 index 0000000000..0c2a76f428 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestEndlessBestPossibleNodes.java @@ -0,0 +1,114 @@ +package org.apache.helix.integration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestEndlessBestPossibleNodes extends ZkTestBase { + + public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster"; + public static int PARTICIPANT_COUNT = 12; + public static List _participants = new ArrayList<>(); + public static ClusterControllerManager _controller; + public static ConfigAccessor _configAccessor; + + @BeforeClass + public void beforeClass() { + System.out.println("Start test " + TestHelper.getTestClassName()); + _gSetupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < PARTICIPANT_COUNT; i++) { + addParticipant("localhost_" + i); + if (i == PARTICIPANT_COUNT-1) { + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, "localhost_" + i, false); + } + } + + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _configAccessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + String testCapacityKey = "TestCapacityKey"; + clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey)); + clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 10000)); + clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1)); + clusterConfig.setDelayRebalaceEnabled(true); + clusterConfig.setRebalanceDelayTime(57600000); + clusterConfig.setMaxOfflineInstancesAllowed(10); + clusterConfig.setPersistBestPossibleAssignment(true); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + } + + // This test was constructed to capture the bug described in issue 2891 + // https://github.com/apache/helix/issues/2891 + @Test + public void testEndlessBestPossibleNodes() throws Exception { + int numPartition = 10; + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + + // Create 1 WAGED Resource + String firstDB = "InPlaceMigrationTestDB3"; + _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby", + IdealState.RebalanceMode.FULL_AUTO.name(), null); + IdealState idealStateOne = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB); + idealStateOne.setMinActiveReplicas(2); + idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName()); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3); + Assert.assertTrue(verifier.verifyByPolling()); + + // Disable all instances + for (MockParticipantManager participantManager : _participants) { + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, participantManager.getInstanceName(), false); + } + + // Add new instances + for (int i = 0; i < 4; i++) { + String instanceName = "newInstance_" + i; + addParticipant(instanceName); + } + // Sleep to let pipeline run and auto enter maintenance mode + Thread.sleep(1000); + + // Enable instances to no longer exceed max offline and cluster no longer auto enters maintenance mode + for (int i = 0 ; i < 3; i++) { + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participants.get(i).getInstanceName(), true); + } + + // Sleep to let pipeline run then disable maintenance mode + Thread.sleep(1000); + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "gspencer", null); + + // Sleep to let best possible nodes be created + Thread.sleep(5000); + + int childCount = _gZkClient.getChildren("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/BEST_POSSIBLE").size(); + Assert.assertTrue(childCount > 0); + Assert.assertTrue(childCount < 5, "Child count was " + childCount); + } + + public MockParticipantManager addParticipant(String instanceName) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName); + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + participant.syncStart(); + _participants.add(participant); + return participant; + } +}