Skip to content

Commit

Permalink
Created inital version of Rest layer for workflows
Browse files Browse the repository at this point in the history
Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz committed Mar 7, 2023
1 parent a8b6913 commit ca32fda
Show file tree
Hide file tree
Showing 11 changed files with 567 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
import org.opensearch.alerting.resthandler.RestExecuteWorkflowAction
import org.opensearch.alerting.resthandler.RestGetAlertsAction
Expand All @@ -37,6 +38,7 @@ import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
Expand Down Expand Up @@ -129,6 +131,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows"
@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
@JvmField val EMAIL_ACCOUNT_BASE_URI = "$DESTINATION_BASE_URI/email_accounts"
@JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups"
Expand Down Expand Up @@ -162,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetMonitorAction(),
RestDeleteMonitorAction(),
RestIndexMonitorAction(),
RestIndexWorkflowAction(),
RestSearchMonitorAction(settings, clusterService),
RestExecuteMonitorAction(),
RestExecuteWorkflowAction(),
Expand All @@ -174,7 +178,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetDestinationsAction(),
RestGetAlertsAction(),
RestGetFindingsAction(),
RestGetWorkflowAction()
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
)
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.REFRESH
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.action.RestToXContentListener
import java.io.IOException

/**
* This class consists of the REST handler to delete workflows.
*/
class RestDeleteWorkflowAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

override fun getName(): String {
return "delete_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(
RestRequest.Method.DELETE,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")

val workflowId = request.param("workflowID")
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/$workflowId")

val refreshPolicy =
WriteRequest.RefreshPolicy.parse(request.param(REFRESH, WriteRequest.RefreshPolicy.IMMEDIATE.value))
val deleteWorkflowRequest = DeleteWorkflowRequest(workflowId, refreshPolicy)

return RestChannelConsumer { channel ->
client.execute(
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, deleteWorkflowRequest,
RestToXContentListener(channel)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.rest.action.RestToXContentListener
import org.opensearch.search.fetch.subphase.FetchSourceContext

/**
* This class consists of the REST handler to retrieve a monitor .
* This class consists of the REST handler to retrieve a workflow .
*/
class RestGetWorkflowAction : BaseRestHandler() {

Expand All @@ -30,14 +30,23 @@ class RestGetWorkflowAction : BaseRestHandler() {
}

override fun routes(): List<RestHandler.Route> {
return listOf()
return listOf(
RestHandler.Route(
RestRequest.Method.GET,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
),
RestHandler.Route(
RestRequest.Method.HEAD,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")

val monitorId = request.param("workflowID")
if (monitorId == null || monitorId.isEmpty()) {
val workflowId = request.param("workflowID")
if (workflowId == null || workflowId.isEmpty()) {
throw IllegalArgumentException("missing id")
}

Expand All @@ -46,7 +55,7 @@ class RestGetWorkflowAction : BaseRestHandler() {
srcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE
}
val getWorkflowRequest =
GetWorkflowRequest(monitorId, RestActions.parseVersion(request), request.method(), srcContext)
GetWorkflowRequest(workflowId, RestActions.parseVersion(request), request.method(), srcContext)
return RestChannelConsumer {
channel ->
client.execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, getWorkflowRequest, RestToXContentListener(channel))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.alerting.resthandler

import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.IF_PRIMARY_TERM
import org.opensearch.alerting.util.IF_SEQ_NO
import org.opensearch.alerting.util.REFRESH
import org.opensearch.client.node.NodeClient
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.BytesRestResponse
import org.opensearch.rest.RestChannel
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestResponse
import org.opensearch.rest.RestStatus
import org.opensearch.rest.action.RestResponseListener
import java.io.IOException
import java.time.Instant

/**
* Rest handlers to create and update workflows.
*/
class RestIndexWorkflowAction : BaseRestHandler() {

override fun getName(): String {
return "index_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(RestRequest.Method.POST, AlertingPlugin.WORKFLOW_BASE_URI),
RestHandler.Route(
RestRequest.Method.PUT,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val id = request.param("workflowID", Workflow.NO_ID)
if (request.method() == RestRequest.Method.PUT && Workflow.NO_ID == id) {
throw IllegalArgumentException("Missing workflow ID")
}

// Validate request by parsing JSON to Monitor
val xcp = request.contentParser()
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val workflow = Workflow.parse(xcp, id).copy(lastUpdateTime = Instant.now())
val rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
val refreshPolicy = if (request.hasParam(REFRESH)) {
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}
val workflowRequest =
IndexWorkflowRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), workflow, rbacRoles)

return RestChannelConsumer { channel ->
client.execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, workflowRequest, indexMonitorResponse(channel, request.method()))
}
}

private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener<IndexWorkflowResponse> {
return object : RestResponseListener<IndexWorkflowResponse>(channel) {
@Throws(Exception::class)
override fun buildResponse(response: IndexWorkflowResponse): RestResponse {
var returnStatus = RestStatus.CREATED
if (restMethod == RestRequest.Method.PUT)
returnStatus = RestStatus.OK

val restResponse =
BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
if (returnStatus == RestStatus.CREATED) {
val location = "${AlertingPlugin.WORKFLOW_BASE_URI}/${response.id}"
restResponse.addHeader("Location", location)
}
return restResponse
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,9 @@ class TransportIndexCompositeWorkflowAction @Inject constructor(
reqMonitorIds.remove(it.id)
}
if (reqMonitorIds.isNotEmpty()) {
log.error("monitorIds: " + monitorIds.joinToString())
log.error("delegateMonitors: " + delegateMonitors.joinToString { it.id })
log.error("reqMonitorIds: " + reqMonitorIds.joinToString())
throw AlertingException.wrap(IllegalArgumentException(("${reqMonitorIds.joinToString()} are not valid monitor ids")))
}
}
Expand All @@ -552,6 +555,14 @@ class TransportIndexCompositeWorkflowAction @Inject constructor(
monitors.add(monitor as Monitor)
}
}
if (monitors.isEmpty()) {
val searchSource1 = SearchSourceBuilder().query(QueryBuilders.matchAllQuery())
val searchRequest1 = SearchRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).source(searchSource1)
val response1: SearchResponse = client.suspendUntil { client.search(searchRequest1, it) }

print(response1)
}

return monitors
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ fun randomDocumentLevelMonitor(
)
}

fun randomWorkflowMonitor(
fun randomWorkflow(
id: String = Workflow.NO_ID,
monitorIds: List<String>,
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
Expand Down Expand Up @@ -386,6 +386,7 @@ fun randomScript(source: String = "return " + OpenSearchRestTestCase.randomBoole

val ADMIN = "admin"
val ALERTING_BASE_URI = "/_plugins/_alerting/monitors"
val WORKFLOW_ALERTING_BASE_URI = "/_plugins/_alerting/workflows"
val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
val LEGACY_OPENDISTRO_ALERTING_BASE_URI = "/_opendistro/_alerting/monitors"
val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
Expand Down
Loading

0 comments on commit ca32fda

Please sign in to comment.