Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FS stats #16

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9f48a02
fixing input validation in segments and delete pit request
bharath-techie Mar 13, 2023
bc663c6
Merge branch 'opensearch-project:main' into main
bharath-techie Mar 31, 2023
aab583e
Merge branch 'main' into main
bharath-techie Apr 7, 2023
c3401e8
Merge branch 'main' of https://github.com/opensearch-project/OpenSearch
bharath-techie Apr 10, 2023
7b01186
Merge branch 'main' of https://github.com/opensearch-project/OpenSearch
bharath-techie Apr 11, 2023
9887230
Merge branch 'main' into main
bharath-techie May 18, 2023
84ee421
Update CHANGELOG.md - addressing comments
bharath-techie May 18, 2023
3072b8e
adddressing comments
bharath-techie May 24, 2023
c39f550
Merge branch 'main' of https://github.com/opensearch-project/OpenSearch
bharath-techie May 24, 2023
197cbec
addressing comments
bharath-techie May 24, 2023
d49d0e5
Merge branch 'main' of https://github.com/opensearch-project/OpenSearch
bharath-techie May 25, 2023
ba7a1af
Merge branch 'main' of github.com:opensearch-project/OpenSearch
bharath-techie Jun 6, 2023
d391d85
Merge branch 'main' of github.com:opensearch-project/OpenSearch
bharath-techie Jun 20, 2023
27ec351
AC poc changes by ajay
bharath-techie Jun 21, 2023
e4be79d
adding node perf stats--signoff
bharath-techie Jun 21, 2023
2c98a0d
adding perf stats to ranking
bharath-techie Jun 27, 2023
9fb3253
more changes
bharath-techie Jul 2, 2023
a046f9d
latency
bharath-techie Jul 21, 2023
0ddb0e2
latency
bharath-techie Jul 21, 2023
cb0adac
latency
bharath-techie Jul 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,16 @@ protected void doRun() {
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
threadPool.getThreadContext().getResponseHeaders();
if(threadPool.getThreadContext().getTransient("PERF_STATS") != null) {
// Map<String, NodePerfStats> nodePerfStats = (Map<String, NodePerfStats>) threadPool.getThreadContext().getTransient("PERF_STATS");
// for (NodePerfStats perfStats : nodePerfStats.values()) {
// logger.info("Response " +
// "CPU : {} , MEM : {} , IO : {}", perfStats.cpuPercentAvg, perfStats.memoryPercentAvg, perfStats.ioPercentAvg);
// }
}
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
clusterService.state().getRoutingTable().shardRoutingTable(bulkShardResponse.getShardId());
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.action.update.UpdateHelper;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.admissioncontroller.NodePerfStats;
import org.opensearch.client.transport.NoNodeAvailableException;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
Expand All @@ -77,6 +78,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -108,9 +110,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -428,7 +428,7 @@ public void onClusterServiceClose() {
public void onTimeout(TimeValue timeout) {
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
}
}), listener, threadPool, executor(primary));
}), listener, threadPool, executor(primary), clusterService.localNode().getId());
}

@Override
Expand All @@ -453,7 +453,8 @@ public static void performOnPrimary(
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
ThreadPool threadPool,
String executorName
String executorName,
String nodeId
) {
new ActionRunnable<PrimaryResult<BulkShardRequest, BulkShardResponse>>(listener) {

Expand Down Expand Up @@ -516,6 +517,24 @@ public boolean isForceExecution() {
}

private void finishRequest() {
Map<String, NodePerfStats> nodePerfStatsMap = new HashMap();
NodePerfStats nodePerfStats = new NodePerfStats(0.95, 0.95,0.95);
nodePerfStatsMap.put(nodeId, nodePerfStats);
ThreadContext threadContext = threadPool.getThreadContext();
threadContext.addResponseHeader("PERF_STATS", String.valueOf(nodePerfStats.cpuPercentAvg) + "-"
+ String.valueOf(nodePerfStats.memoryPercentAvg) + "-" + String.valueOf(nodePerfStats.ioPercentAvg));
// Map<String, NodePerfStats> np = new HashMap<>();
// if(threadContext.getTransient("PERF_STATS") != null ) {
// np = threadContext.getTransient("PERF_STATS");
// }
// np.put(nodeId, nodePerfStats);
//ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS"));
//ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS"));
// threadContext.putTransient("PERF_STATS", nodePerfStats);
// threadContext.putHeader("PERF_STATS", nodePerfStatsMap);
// ThreadContext.StoredContext storedContext1 = threadContext.newStoredContext(true, Collections.singletonList("T_ID"));
// threadContext.putTransient("T_ID", "nodePerfStats");

ActionListener.completeWith(
listener,
() -> new WritePrimaryResult<>(
Expand All @@ -527,6 +546,8 @@ private void finishRequest() {
logger
)
);
//storedContext.close();
// storedContext1.close();
}
}.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<
}

shardsIt = clusterService.operationRouting()
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null);
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null, null);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.search;

import org.opensearch.action.ActionListener;
import org.opensearch.admissioncontroller.NodePerfStats;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.fetch.QueryFetchSearchResult;
Expand Down Expand Up @@ -91,9 +92,10 @@ public void onResponse(SearchPhaseResult response) {
final long serviceTimeEWMA = queryResult.serviceTimeEWMA();
final int queueSize = queryResult.nodeQueueSize();
final long responseDuration = System.nanoTime() - startNanos;
final NodePerfStats nodePerfStats = queryResult.getNodePerfStats();
// EWMA/queue size may be -1 if the query node doesn't support capturing it
if (serviceTimeEWMA > 0 && queueSize >= 0) {
collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA);
collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA, nodePerfStats);
}
}
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,8 @@ private void executeSearch(
routingMap,
searchRequest.preference(),
searchService.getResponseCollectorService(),
nodeSearchCounts
nodeSearchCounts,
searchService.getAdmissionControllerService()
);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,8 @@ protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request,
new ChannelActionListener<>(channel, transportPrimaryAction, request),
releasable::close
);

try {
new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run();
// here
try {new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.admissioncontroller;

import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.transport.TransportInterceptor;

import java.util.ArrayList;
import java.util.List;

/**
* Plugin
*/
public class AdmissionControllerPlugin extends Plugin implements NetworkPlugin {

public AdmissionControllerService admissionControllerService;
public AdmissionControllerPlugin(AdmissionControllerService admissionControllerService) {
this.admissionControllerService = admissionControllerService;
}
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
List<TransportInterceptor> interceptors = new ArrayList<TransportInterceptor>(1);
interceptors.add(new AdmissionControllerTransportInterceptor(null, this.admissionControllerService));
return interceptors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.admissioncontroller;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;

/**
* Handler
* @param <T>
*/
public class AdmissionControllerRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {
private final String action;
private final TransportRequestHandler<T> actualHandler;
private final ThreadPool threadPool;
protected final Logger log = LogManager.getLogger(this.getClass());
public AdmissionControllerService admissionControllerService;
public AdmissionControllerRequestHandler(String action, TransportRequestHandler<T> actualHandler,
ThreadPool threadPool, AdmissionControllerService admissionControllerService) {
super();
this.action = action;
this.actualHandler = actualHandler;
this.threadPool = threadPool;
this.admissionControllerService = admissionControllerService;
}

protected ThreadContext getThreadContext() {
if(threadPool == null) {
return null;
}
threadPool.getThreadContext().getTransient("PERF_STATS");
return threadPool.getThreadContext();
}

public boolean isSearchRequest() {
return this.action.startsWith("indices:data/read/search");
}

public boolean isIndexRequest(){
return this.action.startsWith("indices:data/write");
}

@Override
public void messageReceived(T request, TransportChannel channel, Task task) throws Exception {
// Evaluate the requests here.
if (this.admissionControllerService.isIOInStress()) {
log.info("Admission controller service responded with IO is in stress state");
// if (this.isSearchRequest()){
// channel.sendResponse(new OpenSearchRejectedExecutionException("Execution Rejected due to high IO usage"));
// return;
// }
}else {
//log.info("Admission controller service responded with IO is in healthy state");
}
this.messageReceivedDecorate(request, actualHandler, channel, task);
}

protected void messageReceivedDecorate(final T request, final TransportRequestHandler<T> actualHandler, final TransportChannel transportChannel, Task task) throws Exception {
actualHandler.messageReceived(request, transportChannel, task);
}
}
Loading