Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Javadoc changes and more #20

Open
wants to merge 2 commits into
base: ac-pr-integ
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats
- [Admission Control] Add changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- [Admission Control] Add changes to integrate CPU AC and ResourceUsageCollector with Stats ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ public NodeStats(StreamInput in) throws IOException {
} else {
repositoriesStats = null;
}
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
// TODO: change to V_2_12_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new);
} else {
admissionControlStats = null;
Expand Down Expand Up @@ -504,6 +505,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(repositoriesStats);
}
// TODO: change to V_2_12_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(admissionControlStats);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ protected TransportReplicationAction(

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);

if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){
// Register only TransportShardBulkAction for admission control ( primary indexing action )
if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,19 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
return actualHandler;
}

/**
* Intercept the transport action and perform admission control if applicable
*/
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler,
AdmissionControlActionType transportActionType
AdmissionControlActionType admissionControlActionType
) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType);
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType);
}
return actualHandler;
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public NodeStats stats(
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats(): null
admissionControl ? this.admissionControlService.stats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,21 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.BaseAdmissionControllerStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.CPUBasedAdmissionControllerStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER;
import static org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER;

/**
* Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
Expand All @@ -47,8 +45,14 @@ public class AdmissionControlService {
* @param settings Immutable settings instance
* @param clusterService ClusterService Instance
* @param threadPool ThreadPool Instance
* @param resourceUsageCollectorService Instance used to get node resource usage stats
*/
public AdmissionControlService(Settings settings, ClusterService clusterService, ThreadPool threadPool, ResourceUsageCollectorService resourceUsageCollectorService) {
public AdmissionControlService(
Settings settings,
ClusterService clusterService,
ThreadPool threadPool,
ResourceUsageCollectorService resourceUsageCollectorService
) {
this.threadPool = threadPool;
this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings);
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
Expand All @@ -68,11 +72,13 @@ private void initialise() {

/**
*
* @param action transport action that is being executed. we are using it for logging while request is rejected
* @param admissionControlActionType type of the admissionControllerActionType
* @param action Transport action name
* @param admissionControlActionType admissionControllerActionType value
*/
public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) {
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action, admissionControlActionType); });
this.ADMISSION_CONTROLLERS.forEach(
(name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }
);
}

/**
Expand All @@ -90,7 +96,12 @@ public void registerAdmissionController(String admissionControllerName) {
private AdmissionController controllerFactory(String admissionControllerName) {
switch (admissionControllerName) {
case CPU_BASED_ADMISSION_CONTROLLER:
return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterService, this.resourceUsageCollectorService);
return new CPUBasedAdmissionController(
admissionControllerName,
this.resourceUsageCollectorService,
this.clusterService,
this.settings
);
default:
throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
}
Expand All @@ -113,26 +124,18 @@ public AdmissionController getAdmissionController(String controllerName) {
return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null);
}

public AdmissionControlStats stats(){
List<BaseAdmissionControllerStats> statsList = new ArrayList<>();
if(this.ADMISSION_CONTROLLERS.size() > 0){
/**
* Return admission control stats
*/
public AdmissionControlStats stats() {
List<AdmissionControllerStats> statsList = new ArrayList<>();
if (this.ADMISSION_CONTROLLERS.size() > 0) {
this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> {
BaseAdmissionControllerStats admissionControllerStats = controllerStatsFactory(admissionController);
if(admissionControllerStats != null) {
statsList.add(admissionControllerStats);
}
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController, controllerName);
statsList.add(admissionControllerStats);
});
return new AdmissionControlStats(statsList);
}
return null;
}

private BaseAdmissionControllerStats controllerStatsFactory(AdmissionController admissionController) {
switch (admissionController.getName()) {
case CPU_BASED_ADMISSION_CONTROLLER:
return new CPUBasedAdmissionControllerStats(admissionController);
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;

Expand All @@ -33,12 +32,16 @@ public abstract class AdmissionController {
public final ClusterService clusterService;

/**
* @param rejectionCount initialised rejectionCount value for AdmissionController
* @param admissionControllerName name of the admissionController
* @param admissionControllerName name of the admissionController
* @param resourceUsageCollectorService instance used to get resource usage stats of the node
* @param clusterService
*/
public AdmissionController(AtomicLong rejectionCount, String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, ClusterService clusterService) {
this.rejectionCount = rejectionCount;
public AdmissionController(
String admissionControllerName,
ResourceUsageCollectorService resourceUsageCollectorService,
ClusterService clusterService
) {
this.rejectionCount = new AtomicLong(0);
this.admissionControllerName = admissionControllerName;
this.resourceUsageCollectorService = resourceUsageCollectorService;
this.clusterService = clusterService;
Expand All @@ -62,8 +65,7 @@ public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionContr
}

/**
* Increment the tracking-objects and apply the admission control if threshold is breached.
* Mostly applicable while applying admission controller
* Apply admission control based on the resource usage for an action
*/
public abstract void apply(String action, AdmissionControlActionType admissionControlActionType);

Expand All @@ -74,9 +76,12 @@ public String getName() {
return this.admissionControllerName;
}

/**
* Add rejection count to the rejection count metric tracked by the admission controller
*/
public void addRejectionCount(String admissionControlActionType, long count) {
AtomicLong updatedCount = new AtomicLong(0);
if(this.rejectionCountMap.containsKey(admissionControlActionType)){
if (this.rejectionCountMap.containsKey(admissionControlActionType)) {
updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get());
}
updatedCount.addAndGet(count);
Expand All @@ -91,6 +96,9 @@ public long getRejectionCount(String admissionControlActionType) {
return rejectionCount.get();
}

/**
* Get rejection stats of the admission controller
*/
public Map<String, Long> getRejectionStats() {
Map<String, Long> rejectionStats = new HashMap<>();
rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,73 +16,103 @@
import org.opensearch.node.NodeResourceUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

/**
* Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control.
* It provides methods to apply admission control if configured limit has been reached
*/
public class CPUBasedAdmissionController extends AdmissionController {
public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage";
private static final Logger LOGGER = LogManager.getLogger(CPUBasedAdmissionController.class);
public CPUBasedAdmissionControllerSettings settings;

/**
*
* @param admissionControllerName State of the admission controller
* @param admissionControllerName Name of the admission controller
* @param resourceUsageCollectorService Instance used to get node resource usage stats
* @param clusterService ClusterService Instance
* @param settings Immutable settings instance
*/
public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterService clusterService, ResourceUsageCollectorService resourceUsageCollectorService) {
super(new AtomicLong(0), admissionControllerName, resourceUsageCollectorService, clusterService);
public CPUBasedAdmissionController(
String admissionControllerName,
ResourceUsageCollectorService resourceUsageCollectorService,
ClusterService clusterService,
Settings settings
) {
super(admissionControllerName, resourceUsageCollectorService, clusterService);
this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
}

/**
* This function will take of applying admission controller based on CPU usage
* Apply admission control based on process CPU usage
* @param action is the transport action
*/
@Override
public void apply(String action, AdmissionControlActionType admissionControlActionType) {
// TODO Will extend this logic further currently just incrementing rejectionCount
if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) {
this.applyForTransportLayer(action, admissionControlActionType);
}
}

/**
* Apply transport layer admission control if configured limit has been reached
*/
private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) {
if (isLimitsBreached(admissionControlActionType)) {
if (isLimitsBreached(actionName, admissionControlActionType)) {
this.addRejectionCount(admissionControlActionType.getType(), 1);
if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) {
throw new OpenSearchRejectedExecutionException("Action ["+ actionName +"] was rejected due to CPU usage admission controller limit breached");
throw new OpenSearchRejectedExecutionException(
String.format("CPU usage admission controller limit reached for action [%s]", admissionControlActionType.name())
);
}
}
}

private boolean isLimitsBreached(AdmissionControlActionType transportActionType) {
long maxCpuLimit = this.getCpuRejectionThreshold(transportActionType);
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(this.clusterService.state().nodes().getLocalNodeId());
if(nodePerformanceStatistics.isPresent()) {
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
if (cpuUsage >= maxCpuLimit){
LOGGER.warn("CpuBasedAdmissionController rejected the request as the current CPU usage [" +
cpuUsage + "%] exceeds the allowed limit [" + maxCpuLimit + "%]");
return true;
/**
* Check if the configured resource usage limits are breached for the action
*/
private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) {
// check if cluster state is ready
if (clusterService.state() != null && clusterService.state().nodes() != null) {
long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType);
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(
this.clusterService.state().nodes().getLocalNodeId()
);
if (nodePerformanceStatistics.isPresent()) {
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
if (cpuUsage >= maxCpuLimit) {
LOGGER.warn(
"CpuBasedAdmissionController rejected the request as the current CPU "
+ "usage [{}] exceeds the allowed limit [{}] for transport action [{}]",
cpuUsage,
maxCpuLimit,
actionName
);
return true;
}
}
}
return false;
}
private long getCpuRejectionThreshold(AdmissionControlActionType transportActionType) {
switch (transportActionType) {

/**
* Get CPU rejection threshold based on action type
*/
private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) {
switch (admissionControlActionType) {
case SEARCH:
return this.settings.getSearchCPULimit();
case INDEXING:
return this.settings.getIndexingCPULimit();
default:
throw new IllegalArgumentException("Not Supported TransportAction Type: " + transportActionType.getType());
throw new IllegalArgumentException(
String.format(
"Admission control not Supported for AdmissionControlActionType: %s",
admissionControlActionType.getType()
)
);
}
}
}
Loading