Skip to content

Commit

Permalink
Support Simple Greedy Rebalance Strategy
Browse files Browse the repository at this point in the history
Support Simple Greedy Rebalance Strategy for RoundRobin assignment. It also supported with global
limitation on how many partition assigned for each of the instance.
  • Loading branch information
junkaixue committed Feb 12, 2024
1 parent 1ef5053 commit 332d17a
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.apache.helix.controller.common;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* A Node is an entity that can serve capacity recording purpose. It has a capacity and knowledge
* of partitions assigned to it, so it can decide if it can receive additional partitions.
*/
public class CapacityNode {
private int _currentlyAssigned;
private int _capacity;
private final String _id;
private final Map<String, Set<String>> _partitionMap;

public CapacityNode(String id) {
_partitionMap = new HashMap<>();
_currentlyAssigned = 0;
this._id = id;
}

/**
* Check if this replica can be legally added to this node
*
* @param resource The resource to assign
* @param partition The partition to assign
* @return true if the assignment can be made, false otherwise
*/
public boolean canAdd(String resource, String partition) {
if (_currentlyAssigned >= _capacity || (_partitionMap.containsKey(resource)
&& _partitionMap.get(resource).contains(partition))) {
return false;
}
_partitionMap.computeIfAbsent(resource, k -> new HashSet<>()).add(partition);
_currentlyAssigned++;
return true;
}

/**
* Set the capacity of this node
* @param capacity The capacity to set
*/
public void setCapacity(int capacity) {
_capacity = capacity;
}

/**
* Get the ID of this node
* @return The ID of this node
*/
public String getId() {
return _id;
}

/**
* Get number of partitions currently assigned to this node
* @return The number of partitions currently assigned to this node
*/
public int getCurrentlyAssigned() {
return _currentlyAssigned;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("##########\nname=").append(_id).append("\nassigned:").append(_currentlyAssigned)
.append("\ncapacity:").append(_capacity);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.helix.common.caches.CustomizedViewCache;
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
Expand Down Expand Up @@ -81,6 +82,8 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
// Maintain a set of all ChangeTypes for change detection
private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
private Set<String> _aggregationEnabledTypes = new HashSet<>();
private Set<CapacityNode> _simpleCapacitySet;


// CrushEd strategy needs to have a stable partition list input. So this cached list persist the
// previous seen partition lists. If the members in a list are not modified, the old list will be
Expand Down Expand Up @@ -172,6 +175,10 @@ public synchronized void refresh(HelixDataAccessor accessor) {
// TODO: impacting user's clusters.
refreshStablePartitionList(getIdealStates());

if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
}

LogUtil.logInfo(logger, getClusterEventId(), String.format(
"END: ResourceControllerDataProvider.refresh() for cluster %s, started at %d took %d for %s pipeline",
getClusterName(), startTime, System.currentTimeMillis() - startTime, getPipelineName()));
Expand Down Expand Up @@ -521,4 +528,17 @@ public boolean checkAndReduceCapacity(String instance, String resourceName, Stri
public WagedInstanceCapacity getWagedInstanceCapacity() {
return _wagedInstanceCapacity;
}

private void buildSimpleCapacityMap(int globalMaxPartitionAllowedPerInstance) {
_simpleCapacitySet = new HashSet<>();
for (String instance : getEnabledLiveInstances()) {
CapacityNode capacityNode = new CapacityNode(instance);
capacityNode.setCapacity(globalMaxPartitionAllowedPerInstance);
_simpleCapacitySet.add(capacityNode);
}
}

public Set<CapacityNode> getSimpleCapacitySet() {
return _simpleCapacitySet;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.apache.helix.controller.rebalancer.strategy;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GreedyRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> {
private static Logger logger = LoggerFactory.getLogger(GreedyRebalanceStrategy.class);
private String _resourceName;
private List<String> _partitions;
private LinkedHashMap<String, Integer> _states;

public GreedyRebalanceStrategy() {
}

@Override
public void init(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states, int maximumPerNode) {
_resourceName = resourceName;
_partitions = partitions;
_states = states;
}

@Override
public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes,
final Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) {
int numReplicas = countStateReplicas();
ZNRecord znRecord = new ZNRecord(_resourceName);
if (liveNodes.size() == 0) {
return znRecord;
}

if (clusterData.getSimpleCapacitySet() == null) {
logger.warn("No capacity set for resource: " + _resourceName);
return znRecord;
}

List<CapacityNode> assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet());
Collections.sort(assignableNodes, Comparator.comparing(CapacityNode::getId));

for (int i = 0, index = 0; i < _partitions.size(); i++) {
int startIndex = index;
List<String> preferenceList = new ArrayList<>();
for (int j = 0; j < numReplicas; j++) {
if (index - startIndex >= assignableNodes.size()) {
logger.warn("No enough assignable nodes for resource: " + _resourceName);
break;
}
while (index - startIndex < assignableNodes.size()) {
CapacityNode node = assignableNodes.get(index++ % assignableNodes.size());
if (node.canAdd(_resourceName, _partitions.get(i))) {
preferenceList.add(node.getId());
break;
}
}
}
znRecord.setListField(_partitions.get(i), preferenceList);
}

return znRecord;
}

private int countStateReplicas() {
int total = 0;
for (Integer count : _states.values()) {
total += count;
}
return total;
}
}
24 changes: 24 additions & 0 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public enum ClusterConfigProperty {

// The following concerns maintenance mode
MAX_PARTITIONS_PER_INSTANCE,
// The maximum number of partitions that an instance can serve in this cluster.
// This only works for GreedyRebalanceStrategy.
// TODO: if we want to support this for other rebalancers, we need to implement that logic
GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE,
// The following two include offline AND disabled instances
MAX_OFFLINE_INSTANCES_ALLOWED,
NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT, // For auto-exiting maintenance mode
Expand Down Expand Up @@ -511,6 +515,26 @@ public int getMaxPartitionsPerInstance() {
return _record.getIntField(ClusterConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(), -1);
}

/**
* Set the maximum number of partitions allowed to assign to an instance in this cluster.
*
* @param globalMaxPartitionAllowedPerInstance the maximum number of partitions allowed
*/
public void setGlobalMaxPartitionAllowedPerInstance(int globalMaxPartitionAllowedPerInstance) {
_record.setIntField(ClusterConfigProperty.GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE.name(),
globalMaxPartitionAllowedPerInstance);
}

/**
* Get the maximum number of partitions allowed to assign to an instance in this cluster.
*
* @return the maximum number of partitions allowed, or Integer.MAX_VALUE
*/
public int getGlobalMaxPartitionAllowedPerInstance() {
return _record.getIntField(
ClusterConfigProperty.GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE.name(), -1);
}

/**
* Set the max offline instances allowed for the cluster. If number of pff-line or disabled
* instances
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.apache.helix.controller.rebalancer;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.Mockito.when;

public class TestGreedyRebalanceStrategy {
private static final String TEST_CLUSTER_NAME = "TestCluster";
private static final String TEST_RESOURCE_PREFIX = "TestResource_";

@Test
public void testAssignmentWithGlobalPartitionLimit() {

ResourceControllerDataProvider clusterDataCache =
Mockito.mock(ResourceControllerDataProvider.class);
LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(2);
states.put("OFFLINE", 0);
states.put("ONLINE", 1);

Set<CapacityNode> capacityNodeSet = new HashSet<>();
for (int i = 0; i < 5; i++) {
CapacityNode capacityNode = new CapacityNode("Node-" + i);
capacityNode.setCapacity(1);
capacityNodeSet.add(capacityNode);
}

List<String> liveNodes =
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());

List<String> partitions = new ArrayList<>();
for (int i = 0; i < 3; i++) {
partitions.add(TEST_RESOURCE_PREFIX + "0_" + i);
}
when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);

GreedyRebalanceStrategy greedyRebalanceStrategy = new GreedyRebalanceStrategy();
greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1);
greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache);

partitions = new ArrayList<>();
for (int i = 0; i < 2; i++) {
partitions.add(TEST_RESOURCE_PREFIX + "1_" + i);
}
greedyRebalanceStrategy = new GreedyRebalanceStrategy();
greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states, 1);
greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache);

Assert.assertEquals(
capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() != 1).count(), 0);
}
}

0 comments on commit 332d17a

Please sign in to comment.