Skip to content

Commit

Permalink
Integrated IO Based AdmissionController to AdmissionControl Framework
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Mar 11, 2024
1 parent b15cb0c commit b0f31ea
Show file tree
Hide file tree
Showing 12 changed files with 999 additions and 72 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -706,6 +707,9 @@ public void apply(Settings value, Settings current, Settings previous) {
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT,
IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,

// Concurrent segment search settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,27 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER;
import static org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER;

/**
* Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
Expand Down Expand Up @@ -67,6 +72,7 @@ public AdmissionControlService(
private void initialise() {
// Initialise different type of admission controllers
registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER);
registerAdmissionController(IO_BASED_ADMISSION_CONTROLLER);
}

/**
Expand Down Expand Up @@ -101,6 +107,13 @@ private AdmissionController controllerFactory(String admissionControllerName) {
this.clusterService,
this.settings
);
case IO_BASED_ADMISSION_CONTROLLER:
return new IoBasedAdmissionController(
admissionControllerName,
this.resourceUsageCollectorService,
this.clusterService,
this.settings
);
default:
throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
}
Expand Down Expand Up @@ -128,13 +141,31 @@ public AdmissionController getAdmissionController(String controllerName) {
*/
public AdmissionControlStats stats() {
List<AdmissionControllerStats> statsList = new ArrayList<>();
if (this.admissionControllers.size() > 0) {
if (!this.admissionControllers.isEmpty()) {
this.admissionControllers.forEach((controllerName, admissionController) -> {
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController);
statsList.add(admissionControllerStats);
if (controllerName.equals(IO_BASED_ADMISSION_CONTROLLER)) {
if (Constants.LINUX) {
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController);
if (admissionControllerStats.rejectionCount != null) {
statsList.add(admissionControllerStats);
}
}
} else {
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController);
statsList.add(admissionControllerStats);
}
});
return new AdmissionControlStats(statsList);
}
return null;
}

// used for testing
Map<String, AdmissionControllerStats> getStats() {
Map<String, AdmissionControllerStats> acStats = new HashMap<>();
for (AdmissionControllerStats admissionControllerStats : this.stats().getAdmissionControllerStatsList()) {
acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats);
}
return acStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
* and admission control can be applied if configured limit has been reached
*/
public abstract class AdmissionController {

private final String admissionControllerName;
final ResourceUsageCollectorService resourceUsageCollectorService;
public final Map<String, AtomicLong> rejectionCountMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol.controllers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.node.NodeResourceUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings;

import java.util.Locale;
import java.util.Optional;

public class IoBasedAdmissionController extends AdmissionController {
public static final String IO_BASED_ADMISSION_CONTROLLER = "global_io_usage";
private static final Logger LOGGER = LogManager.getLogger(IoBasedAdmissionController.class);
public IoBasedAdmissionControllerSettings settings;

/**
* @param admissionControllerName name of the admissionController
* @param resourceUsageCollectorService instance used to get resource usage stats of the node
* @param clusterService instance of the clusterService
*/
public IoBasedAdmissionController(
String admissionControllerName,
ResourceUsageCollectorService resourceUsageCollectorService,
ClusterService clusterService,
Settings settings
) {
super(admissionControllerName, resourceUsageCollectorService, clusterService);
this.settings = new IoBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
}

@Override
public boolean isEnabledForTransportLayer(AdmissionControlMode admissionControlMode) {
if (Constants.LINUX) {
return super.isEnabledForTransportLayer(admissionControlMode);
}
return false;
}

/**
* Apply admission control based on the resource usage for an action
*
* @param action is the transport action
* @param admissionControlActionType type of admissionControlActionType
*/
@Override
public void apply(String action, AdmissionControlActionType admissionControlActionType) {
if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) {
this.applyForTransportLayer(action, admissionControlActionType);
}
}

/**
* Apply transport layer admission control if configured limit has been reached
*/
private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) {
if (isLimitsBreached(actionName, admissionControlActionType)) {
this.addRejectionCount(admissionControlActionType.getType(), 1);
if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) {
throw new OpenSearchRejectedExecutionException(
String.format(
Locale.ROOT,
"Io usage admission controller rejected the request for action [%s] as IO limit reached",
admissionControlActionType.name()
)
);
}
}
}

/**
* Check if the configured resource usage limits are breached for the action
*/
private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) {
// check if cluster state is ready
if (clusterService.state() != null && clusterService.state().nodes() != null) {
long maxIoLimit = this.getIoRejectionThreshold(admissionControlActionType);
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(
this.clusterService.state().nodes().getLocalNodeId()
);
if (nodePerformanceStatistics.isPresent()) {
double ioUsage = nodePerformanceStatistics.get().getIoUsageStats().getIoUtilisationPercent();
if (ioUsage >= maxIoLimit) {
LOGGER.warn(
"IoBasedAdmissionController limit reached as the current IO "
+ "usage [{}] exceeds the allowed limit [{}] for transport action [{}] in admissionControlMode [{}]",
ioUsage,
maxIoLimit,
actionName,
this.settings.getTransportLayerAdmissionControllerMode()
);
return true;
}
}
}
return false;
}

/**
* Get IO rejection threshold based on action type
*/
private long getIoRejectionThreshold(AdmissionControlActionType admissionControlActionType) {
switch (admissionControlActionType) {
case SEARCH:
return this.settings.getSearchIOUsageLimit();
case INDEXING:
return this.settings.getIndexingIOUsageLimit();
default:
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Admission control not Supported for AdmissionControlActionType: %s",
admissionControlActionType.getType()
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
public enum AdmissionControlActionType {
INDEXING("indexing"),
SEARCH("search");
SEARCH("search"),
CLUSTER_INFO("cluster_info");

private final String type;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol.settings;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;

/**
* Settings related to IO based admission controller.
* @opensearch.internal
*/
public class IoBasedAdmissionControllerSettings {

/**
* Default parameters for the IoBasedAdmissionControllerSettings
*/
public static class Defaults {
public static final long IO_USAGE_LIMIT = 95;
}

private AdmissionControlMode transportLayerMode;
private Long searchIOUsageLimit;
private Long indexingIOUsageLimit;

/**
* Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set
* rejection will be performed, otherwise only rejection metrics will be populated.
*/
public static final Setting<AdmissionControlMode> IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>(
"admission_control.transport.io_usage.mode_override",
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
AdmissionControlMode::fromName,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* This setting used to set the IO Limits for the search requests by default it will use default IO usage limit
*/
public static final Setting<Long> SEARCH_IO_USAGE_LIMIT = Setting.longSetting(
"admission_control.search.io_usage.limit",
Defaults.IO_USAGE_LIMIT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* This setting used to set the IO limits for the indexing requests by default it will use default IO usage limit
*/
public static final Setting<Long> INDEXING_IO_USAGE_LIMIT = Setting.longSetting(
"admission_control.indexing.io_usage.limit",
Defaults.IO_USAGE_LIMIT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public IoBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) {
this.transportLayerMode = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings);
clusterSettings.addSettingsUpdateConsumer(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode);
this.searchIOUsageLimit = SEARCH_IO_USAGE_LIMIT.get(settings);
this.indexingIOUsageLimit = INDEXING_IO_USAGE_LIMIT.get(settings);
clusterSettings.addSettingsUpdateConsumer(INDEXING_IO_USAGE_LIMIT, this::setIndexingIOUsageLimit);
clusterSettings.addSettingsUpdateConsumer(SEARCH_IO_USAGE_LIMIT, this::setSearchIOUsageLimit);
}

public void setIndexingIOUsageLimit(Long indexingIOUsageLimit) {
this.indexingIOUsageLimit = indexingIOUsageLimit;
}

public void setSearchIOUsageLimit(Long searchIOUsageLimit) {
this.searchIOUsageLimit = searchIOUsageLimit;
}

public AdmissionControlMode getTransportLayerAdmissionControllerMode() {
return transportLayerMode;
}

public void setTransportLayerMode(AdmissionControlMode transportLayerMode) {
this.transportLayerMode = transportLayerMode;
}

public Long getIndexingIOUsageLimit() {
return indexingIOUsageLimit;
}

public Long getSearchIOUsageLimit() {
return searchIOUsageLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public void tearDown() throws Exception {

public void testWhenAdmissionControllerRegistered() {
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
assertEquals(admissionControlService.getAdmissionControllers().size(), 2);
}

public void testRegisterInvalidAdmissionController() {
String test = "TEST";
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
assertEquals(admissionControlService.getAdmissionControllers().size(), 1);
assertEquals(admissionControlService.getAdmissionControllers().size(), 2);
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> admissionControlService.registerAdmissionController(test)
Expand All @@ -66,7 +66,7 @@ public void testAdmissionControllerSettings() {
admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null);
AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings;
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 1);
assertEquals(admissionControllerList.size(), 2);
CpuBasedAdmissionController cpuBasedAdmissionController = (CpuBasedAdmissionController) admissionControlService
.getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER);
assertEquals(
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testApplyAdmissionControllerEnabled() {
.build();
clusterService.getClusterSettings().applySettings(settings);
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 1);
assertEquals(admissionControllerList.size(), 2);
}

public void testApplyAdmissionControllerEnforced() {
Expand All @@ -153,6 +153,6 @@ public void testApplyAdmissionControllerEnforced() {
.build();
clusterService.getClusterSettings().applySettings(settings);
List<AdmissionController> admissionControllerList = admissionControlService.getAdmissionControllers();
assertEquals(admissionControllerList.size(), 1);
assertEquals(admissionControllerList.size(), 2);
}
}
Loading

0 comments on commit b0f31ea

Please sign in to comment.