Skip to content

Commit

Permalink
Add Create QueryGroup API Logic (opensearch-project#14680)
Browse files Browse the repository at this point in the history
* add logic for Create QueryGroup API
Signed-off-by: Ruirui Zhang <[email protected]>

* remove wildcard imports
Signed-off-by: Ruirui Zhang <[email protected]>

* change jvm to memeory
Signed-off-by: Ruirui Zhang <[email protected]>

* modify querygroup
Signed-off-by: Ruirui Zhang <[email protected]>

* fix javadoc and add more tests
Signed-off-by: Ruirui Zhang <[email protected]>

* add more tests
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* fix the persist logic
Signed-off-by: Ruirui Zhang <[email protected]>

* remove inflight checks as they are not necessary
Signed-off-by: Ruirui Zhang <[email protected]>

* remove persistable interface
Signed-off-by: Ruirui Zhang <[email protected]>

* modify QueryGroupServiceSettings
Signed-off-by: Ruirui Zhang <[email protected]>

* add in an action package in the plugin
Signed-off-by: Ruirui Zhang <[email protected]>

* modify based on commments
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments on QueryGroupPersistenceService
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments on persistence service
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* fix unit test
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* add IT
Signed-off-by: Ruirui Zhang <[email protected]>

* add coverage
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 authored Aug 13, 2024
1 parent 2311195 commit d442f7c
Show file tree
Hide file tree
Showing 24 changed files with 1,427 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
- [Streaming Indexing] Enhance RestClient with a new streaming API support ([#14437](https://github.com/opensearch-project/OpenSearch/pull/14437))
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))
- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))
- Add ThreadContextPermission for markAsSystemContext and allow core to perform the method ([#15016](https://github.com/opensearch-project/OpenSearch/pull/15016))
- Add ThreadContextPermission for stashAndMergeHeaders and stashWithOrigin ([#15039](https://github.com/opensearch-project/OpenSearch/pull/15039))
- [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072))
Expand Down
21 changes: 21 additions & 0 deletions plugins/workload-management/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'OpenSearch Workload Management Plugin.'
classname 'org.opensearch.plugin.wlm.WorkloadManagementPlugin'
}

dependencies {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm;

import org.opensearch.action.ActionRequest;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.plugin.wlm.action.CreateQueryGroupAction;
import org.opensearch.plugin.wlm.action.TransportCreateQueryGroupAction;
import org.opensearch.plugin.wlm.rest.RestCreateQueryGroupAction;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;

import java.util.List;
import java.util.function.Supplier;

/**
* Plugin class for WorkloadManagement
*/
public class WorkloadManagementPlugin extends Plugin implements ActionPlugin {

/**
* Default constructor
*/
public WorkloadManagementPlugin() {}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(new ActionPlugin.ActionHandler<>(CreateQueryGroupAction.INSTANCE, TransportCreateQueryGroupAction.class));
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestCreateQueryGroupAction());
}

@Override
public List<Setting<?>> getSettings() {
return List.of(QueryGroupPersistenceService.MAX_QUERY_GROUP_COUNT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionType;

/**
* Transport action to create QueryGroup
*
* @opensearch.experimental
*/
public class CreateQueryGroupAction extends ActionType<CreateQueryGroupResponse> {

/**
* An instance of CreateQueryGroupAction
*/
public static final CreateQueryGroupAction INSTANCE = new CreateQueryGroupAction();

/**
* Name for CreateQueryGroupAction
*/
public static final String NAME = "cluster:admin/opensearch/wlm/query_group/_create";

/**
* Default constructor
*/
private CreateQueryGroupAction() {
super(NAME, CreateQueryGroupResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.UUIDs;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentParser;
import org.joda.time.Instant;

import java.io.IOException;

/**
* A request for create QueryGroup
* User input schema:
* {
* "name": "analytics",
* "resiliency_mode": "enforced",
* "resource_limits": {
* "cpu" : 0.4,
* "memory" : 0.2
* }
* }
*
* @opensearch.experimental
*/
public class CreateQueryGroupRequest extends ActionRequest {
private final QueryGroup queryGroup;

/**
* Constructor for CreateQueryGroupRequest
* @param queryGroup - A {@link QueryGroup} object
*/
public CreateQueryGroupRequest(QueryGroup queryGroup) {
this.queryGroup = queryGroup;
}

/**
* Constructor for CreateQueryGroupRequest
* @param in - A {@link StreamInput} object
*/
public CreateQueryGroupRequest(StreamInput in) throws IOException {
super(in);
queryGroup = new QueryGroup(in);
}

/**
* Generate a CreateQueryGroupRequest from XContent
* @param parser - A {@link XContentParser} object
*/
public static CreateQueryGroupRequest fromXContent(XContentParser parser) throws IOException {
QueryGroup.Builder builder = QueryGroup.Builder.fromXContent(parser);
return new CreateQueryGroupRequest(builder._id(UUIDs.randomBase64UUID()).updatedAt(Instant.now().getMillis()).build());
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
queryGroup.writeTo(out);
}

/**
* QueryGroup getter
*/
public QueryGroup getQueryGroup() {
return queryGroup;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Response for the create API for QueryGroup
*
* @opensearch.experimental
*/
public class CreateQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject {
private final QueryGroup queryGroup;
private final RestStatus restStatus;

/**
* Constructor for CreateQueryGroupResponse
* @param queryGroup - The QueryGroup to be included in the response
* @param restStatus - The restStatus for the response
*/
public CreateQueryGroupResponse(final QueryGroup queryGroup, RestStatus restStatus) {
this.queryGroup = queryGroup;
this.restStatus = restStatus;
}

/**
* Constructor for CreateQueryGroupResponse
* @param in - A {@link StreamInput} object
*/
public CreateQueryGroupResponse(StreamInput in) throws IOException {
queryGroup = new QueryGroup(in);
restStatus = RestStatus.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
queryGroup.writeTo(out);
RestStatus.writeTo(out, restStatus);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return queryGroup.toXContent(builder, params);
}

/**
* queryGroup getter
*/
public QueryGroup getQueryGroup() {
return queryGroup;
}

/**
* restStatus getter
*/
public RestStatus getRestStatus() {
return restStatus;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

/**
* Transport action to create QueryGroup
*
* @opensearch.experimental
*/
public class TransportCreateQueryGroupAction extends HandledTransportAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {

private final ThreadPool threadPool;
private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportCreateQueryGroupAction
*
* @param actionName - action name
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param threadPool - a {@link ThreadPool} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportCreateQueryGroupAction(
String actionName,
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new);
this.threadPool = threadPool;
this.queryGroupPersistenceService = queryGroupPersistenceService;
}

@Override
protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListener<CreateQueryGroupResponse> listener) {
threadPool.executor(ThreadPool.Names.SAME)
.execute(() -> queryGroupPersistenceService.persistInClusterStateMetadata(request.getQueryGroup(), listener));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package for the action classes of WorkloadManagementPlugin
*/
package org.opensearch.plugin.wlm.action;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Base package for WorkloadManagementPlugin
*/
package org.opensearch.plugin.wlm;
Loading

0 comments on commit d442f7c

Please sign in to comment.