Skip to content

Commit

Permalink
modify based on comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Oct 4, 2024
1 parent 0a37cd3 commit 27d7ea0
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 77 deletions.
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@
import org.opensearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksAction;
import org.opensearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsAction;
import org.opensearch.action.admin.cluster.wlm.TransportQueryGroupStatsAction;
import org.opensearch.action.admin.cluster.wlm.TransportWlmStatsAction;
import org.opensearch.action.admin.cluster.wlm.WlmStatsAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.admin.indices.alias.TransportIndicesAliasesAction;
Expand Down Expand Up @@ -369,14 +369,14 @@
import org.opensearch.rest.action.admin.cluster.RestPendingClusterTasksAction;
import org.opensearch.rest.action.admin.cluster.RestPutRepositoryAction;
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
import org.opensearch.rest.action.admin.cluster.RestQueryGroupStatsAction;
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
import org.opensearch.rest.action.admin.cluster.RestRemoteStoreStatsAction;
import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction;
import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
import org.opensearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
import org.opensearch.rest.action.admin.cluster.RestWlmStatsAction;
import org.opensearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction;
import org.opensearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction;
import org.opensearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction;
Expand Down Expand Up @@ -614,7 +614,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
actions.register(QueryGroupStatsAction.INSTANCE, TransportQueryGroupStatsAction.class);
actions.register(WlmStatsAction.INSTANCE, TransportWlmStatsAction.class);
actions.register(RemoteStoreStatsAction.INSTANCE, TransportRemoteStoreStatsAction.class);
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
Expand Down Expand Up @@ -816,7 +816,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestClearVotingConfigExclusionsAction());
registerHandler.accept(new RestMainAction());
registerHandler.accept(new RestNodesInfoAction(settingsFilter));
registerHandler.accept(new RestQueryGroupStatsAction());
registerHandler.accept(new RestWlmStatsAction());
registerHandler.accept(new RestRemoteClusterInfoAction());
registerHandler.accept(new RestNodesStatsAction());
registerHandler.accept(new RestNodesUsageAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,43 @@
*
* @opensearch.experimental
*/
public class TransportQueryGroupStatsAction extends TransportNodesAction<
QueryGroupStatsRequest,
QueryGroupStatsResponse,
QueryGroupStatsRequest,
QueryGroupStats> {
public class TransportWlmStatsAction extends TransportNodesAction<WlmStatsRequest, WlmStatsResponse, WlmStatsRequest, QueryGroupStats> {

QueryGroupService queryGroupService;

@Inject
public TransportQueryGroupStatsAction(
public TransportWlmStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
QueryGroupService queryGroupService,
ActionFilters actionFilters
) {
super(
QueryGroupStatsAction.NAME,
WlmStatsAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
QueryGroupStatsRequest::new,
QueryGroupStatsRequest::new,
WlmStatsRequest::new,
WlmStatsRequest::new,
ThreadPool.Names.MANAGEMENT,
QueryGroupStats.class
);
this.queryGroupService = queryGroupService;
}

@Override
protected QueryGroupStatsResponse newResponse(
QueryGroupStatsRequest request,
protected WlmStatsResponse newResponse(
WlmStatsRequest request,
List<QueryGroupStats> queryGroupStats,
List<FailedNodeException> failures
) {
return new QueryGroupStatsResponse(clusterService.getClusterName(), queryGroupStats, failures);
return new WlmStatsResponse(clusterService.getClusterName(), queryGroupStats, failures);
}

@Override
protected QueryGroupStatsRequest newNodeRequest(QueryGroupStatsRequest request) {
protected WlmStatsRequest newNodeRequest(WlmStatsRequest request) {
return request;
}

Expand All @@ -77,7 +73,7 @@ protected QueryGroupStats newNodeResponse(StreamInput in) throws IOException {
}

@Override
protected QueryGroupStats nodeOperation(QueryGroupStatsRequest queryGroupStatsRequest) {
return queryGroupService.nodeStats(queryGroupStatsRequest.getQueryGroupIds(), queryGroupStatsRequest.isBreach());
protected QueryGroupStats nodeOperation(WlmStatsRequest wlmStatsRequest) {
return queryGroupService.nodeStats(wlmStatsRequest.getQueryGroupIds(), wlmStatsRequest.isBreach());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
import org.opensearch.action.ActionType;

/**
* Transport action for obtaining QueryGroup Stats.
* Transport action for obtaining Workload Management Stats.
*
* @opensearch.experimental
*/
public class QueryGroupStatsAction extends ActionType<QueryGroupStatsResponse> {
public static final QueryGroupStatsAction INSTANCE = new QueryGroupStatsAction();
public class WlmStatsAction extends ActionType<WlmStatsResponse> {
public static final WlmStatsAction INSTANCE = new WlmStatsAction();
public static final String NAME = "cluster:monitor/query_group_stats";

private QueryGroupStatsAction() {
super(NAME, QueryGroupStatsResponse::new);
private WlmStatsAction() {
super(NAME, WlmStatsResponse::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
import java.util.Set;

/**
* A request to get QueryGroupStats
* A request to get Workload Management Stats
*/
@ExperimentalApi
public class QueryGroupStatsRequest extends BaseNodesRequest<QueryGroupStatsRequest> {
public class WlmStatsRequest extends BaseNodesRequest<WlmStatsRequest> {

private final Set<String> queryGroupIds;
private final Boolean breach;

public QueryGroupStatsRequest(StreamInput in) throws IOException {
public WlmStatsRequest(StreamInput in) throws IOException {
super(in);
this.queryGroupIds = new HashSet<>(Set.of(in.readStringArray()));
this.breach = in.readOptionalBoolean();
Expand All @@ -36,13 +36,13 @@ public QueryGroupStatsRequest(StreamInput in) throws IOException {
* Get QueryGroup stats from nodes based on the nodes ids specified. If none are passed, stats
* for all nodes will be returned.
*/
public QueryGroupStatsRequest(String[] nodesIds, Set<String> queryGroupIds, boolean breach) {
public WlmStatsRequest(String[] nodesIds, Set<String> queryGroupIds, boolean breach) {
super(false, nodesIds);
this.queryGroupIds = queryGroupIds;
this.breach = breach;
}

public QueryGroupStatsRequest() {
public WlmStatsRequest() {
super(false, (String[]) null);
queryGroupIds = new HashSet<>();
this.breach = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import java.util.List;

/**
* A response for obtaining QueryGroupStats
* A response for obtaining Workload Management Stats
*/
@ExperimentalApi
public class QueryGroupStatsResponse extends BaseNodesResponse<QueryGroupStats> implements ToXContentFragment {
public class WlmStatsResponse extends BaseNodesResponse<QueryGroupStats> implements ToXContentFragment {

QueryGroupStatsResponse(StreamInput in) throws IOException {
WlmStatsResponse(StreamInput in) throws IOException {
super(in);
}

QueryGroupStatsResponse(ClusterName clusterName, List<QueryGroupStats> nodes, List<FailedNodeException> failures) {
WlmStatsResponse(ClusterName clusterName, List<QueryGroupStats> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
* compatible open source license.
*/

/** QueryGroupStats transport handlers. */
/** WlmStats transport handlers. */
package org.opensearch.action.admin.cluster.wlm;
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsResponse;
import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest;
import org.opensearch.action.admin.cluster.wlm.WlmStatsResponse;
import org.opensearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
import org.opensearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
import org.opensearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
Expand Down Expand Up @@ -328,7 +328,7 @@ public interface ClusterAdminClient extends OpenSearchClient {
* @param request The QueryGroupStatsRequest
* @param listener A listener to be notified with a result
*/
void queryGroupStats(QueryGroupStatsRequest request, ActionListener<QueryGroupStatsResponse> listener);
void wlmStats(WlmStatsRequest request, ActionListener<WlmStatsResponse> listener);

void remoteStoreStats(RemoteStoreStatsRequest request, ActionListener<RemoteStoreStatsResponse> listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsAction;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsResponse;
import org.opensearch.action.admin.cluster.wlm.WlmStatsAction;
import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest;
import org.opensearch.action.admin.cluster.wlm.WlmStatsResponse;
import org.opensearch.action.admin.indices.alias.IndicesAliasesAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
Expand Down Expand Up @@ -922,8 +922,8 @@ public NodesStatsRequestBuilder prepareNodesStats(String... nodesIds) {
}

@Override
public void queryGroupStats(final QueryGroupStatsRequest request, final ActionListener<QueryGroupStatsResponse> listener) {
execute(QueryGroupStatsAction.INSTANCE, request, listener);
public void wlmStats(final WlmStatsRequest request, final ActionListener<WlmStatsResponse> listener) {
execute(WlmStatsAction.INSTANCE, request, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package org.opensearch.rest.action.admin.cluster;

import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest;
import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.common.Strings;
import org.opensearch.rest.BaseRestHandler;
Expand All @@ -24,11 +24,11 @@
import static org.opensearch.rest.RestRequest.Method.GET;

/**
* Transport action to get QueryGroup stats
* Transport action to get Workload Management stats
*
* @opensearch.experimental
*/
public class RestQueryGroupStatsAction extends BaseRestHandler {
public class RestWlmStatsAction extends BaseRestHandler {

@Override
public List<Route> routes() {
Expand All @@ -52,9 +52,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
Set<String> queryGroupIds = Strings.tokenizeByCommaToSet(request.param("queryGroupId", "_all"));
Boolean breach = request.hasParam("breach") ? Boolean.parseBoolean(request.param("boolean")) : null;
QueryGroupStatsRequest queryGroupStatsRequest = new QueryGroupStatsRequest(nodesIds, queryGroupIds, breach);
return channel -> client.admin()
.cluster()
.queryGroupStats(queryGroupStatsRequest, new RestActions.NodesResponseRestListener<>(channel));
WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(nodesIds, queryGroupIds, breach);
return channel -> client.admin().cluster().wlmStats(wlmStatsRequest, new RestActions.NodesResponseRestListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Map;
import java.util.Set;

public class QueryGroupStatsResponseTests extends OpenSearchTestCase {
public class WlmStatsResponseTests extends OpenSearchTestCase {
ClusterName clusterName = new ClusterName("test-cluster");
String testQueryGroupId = "safjgagnaeekg-3r3fads";
DiscoveryNode node = new DiscoveryNode(
Expand Down Expand Up @@ -59,25 +59,17 @@ public class QueryGroupStatsResponseTests extends OpenSearchTestCase {
List<FailedNodeException> failedNodeExceptionList = new ArrayList<>();

public void testSerializationAndDeserialization() throws IOException {
QueryGroupStatsResponse queryGroupStatsResponse = new QueryGroupStatsResponse(
clusterName,
queryGroupStatsList,
failedNodeExceptionList
);
WlmStatsResponse queryGroupStatsResponse = new WlmStatsResponse(clusterName, queryGroupStatsList, failedNodeExceptionList);
BytesStreamOutput out = new BytesStreamOutput();
queryGroupStatsResponse.writeTo(out);
StreamInput in = out.bytes().streamInput();
QueryGroupStatsResponse deserializedResponse = new QueryGroupStatsResponse(in);
WlmStatsResponse deserializedResponse = new WlmStatsResponse(in);
assertEquals(queryGroupStatsResponse.getClusterName(), deserializedResponse.getClusterName());
assertEquals(queryGroupStatsResponse.getNodes().size(), deserializedResponse.getNodes().size());
}

public void testToString() {
QueryGroupStatsResponse queryGroupStatsResponse = new QueryGroupStatsResponse(
clusterName,
queryGroupStatsList,
failedNodeExceptionList
);
WlmStatsResponse queryGroupStatsResponse = new WlmStatsResponse(clusterName, queryGroupStatsList, failedNodeExceptionList);
String responseString = queryGroupStatsResponse.toString();
assertEquals(
"{\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.opensearch.action.support.nodes;

import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest;
import org.opensearch.action.admin.cluster.wlm.TransportQueryGroupStatsAction;
import org.opensearch.action.admin.cluster.wlm.TransportWlmStatsAction;
import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand All @@ -26,15 +26,15 @@

import static org.mockito.Mockito.mock;

public class TransportQueryGroupStatsActionTests extends TransportNodesActionTests {
public class TransportWlmStatsActionTests extends TransportNodesActionTests {

/**
* We don't want to send discovery nodes list to each request that is sent across from the coordinator node.
* This behavior is asserted in this test.
*/
public void testQueryGroupStatsActionWithRetentionOfDiscoveryNodesList() {
QueryGroupStatsRequest request = new QueryGroupStatsRequest();
Map<String, List<QueryGroupStatsRequest>> combinedSentRequest = performQueryGroupStatsAction(request);
public void testWlmStatsActionWithRetentionOfDiscoveryNodesList() {
WlmStatsRequest request = new WlmStatsRequest();
Map<String, List<WlmStatsRequest>> combinedSentRequest = performWlmStatsAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
Expand All @@ -43,30 +43,30 @@ public void testQueryGroupStatsActionWithRetentionOfDiscoveryNodesList() {
});
}

private Map<String, List<QueryGroupStatsRequest>> performQueryGroupStatsAction(QueryGroupStatsRequest request) {
TransportNodesAction action = new TransportQueryGroupStatsAction(
private Map<String, List<WlmStatsRequest>> performWlmStatsAction(WlmStatsRequest request) {
TransportNodesAction action = new TransportWlmStatsAction(
THREAD_POOL,
clusterService,
transportService,
mock(QueryGroupService.class),
new ActionFilters(Collections.emptySet())
);
PlainActionFuture<QueryGroupStatsRequest> listener = new PlainActionFuture<>();
PlainActionFuture<WlmStatsRequest> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
Map<String, List<QueryGroupStatsRequest>> combinedSentRequest = new HashMap<>();
Map<String, List<WlmStatsRequest>> combinedSentRequest = new HashMap<>();

capturedRequests.forEach((node, capturedRequestList) -> {
List<QueryGroupStatsRequest> sentRequestList = new ArrayList<>();
List<WlmStatsRequest> sentRequestList = new ArrayList<>();

capturedRequestList.forEach(preSentRequest -> {
BytesStreamOutput out = new BytesStreamOutput();
try {
QueryGroupStatsRequest QueryGroupStatsRequestFromCoordinator = (QueryGroupStatsRequest) preSentRequest.request;
QueryGroupStatsRequestFromCoordinator.writeTo(out);
WlmStatsRequest wlmStatsRequestFromCoordinator = (WlmStatsRequest) preSentRequest.request;
wlmStatsRequestFromCoordinator.writeTo(out);
StreamInput in = out.bytes().streamInput();
QueryGroupStatsRequest QueryGroupStatsRequest = new QueryGroupStatsRequest(in);
sentRequestList.add(QueryGroupStatsRequest);
WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(in);
sentRequestList.add(wlmStatsRequest);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 27d7ea0

Please sign in to comment.