From ca32fdabfe7108217aec2d109d710696a9fdfe10 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Tue, 7 Mar 2023 20:25:05 +0100 Subject: [PATCH] Created inital version of Rest layer for workflows Signed-off-by: Stevan Buzejic --- .../org/opensearch/alerting/AlertingPlugin.kt | 7 +- .../model/workflow/WorkflowRunResult.kt | 3 - .../resthandler/RestDeleteWorkflowAction.kt | 59 ++++ .../resthandler/RestGetWorkflowAction.kt | 19 +- .../resthandler/RestIndexWorkflowAction.kt | 98 ++++++ .../TransportIndexCompositeWorkflowAction.kt | 11 + .../org/opensearch/alerting/TestHelpers.kt | 3 +- .../opensearch/alerting/WorkflowMonitorIT.kt | 40 +-- .../alerting/WorkflowRestTestCase.kt | 57 +++- .../opensearch/alerting/WorkflowRunnerIT.kt | 10 +- .../alerting/resthandler/WorkflowRestApiIT.kt | 318 +++++++++++++++++- 11 files changed, 567 insertions(+), 58 deletions(-) delete mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/workflow/WorkflowRunResult.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 4ca05c9db..5e28e49c8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -27,6 +27,7 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting import org.opensearch.alerting.core.settings.ScheduledJobSettings import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction import org.opensearch.alerting.resthandler.RestDeleteMonitorAction +import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction import org.opensearch.alerting.resthandler.RestExecuteMonitorAction import org.opensearch.alerting.resthandler.RestExecuteWorkflowAction import org.opensearch.alerting.resthandler.RestGetAlertsAction @@ -37,6 +38,7 @@ import org.opensearch.alerting.resthandler.RestGetFindingsAction import org.opensearch.alerting.resthandler.RestGetMonitorAction import org.opensearch.alerting.resthandler.RestGetWorkflowAction import org.opensearch.alerting.resthandler.RestIndexMonitorAction +import org.opensearch.alerting.resthandler.RestIndexWorkflowAction import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction import org.opensearch.alerting.resthandler.RestSearchMonitorAction @@ -129,6 +131,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R @JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows" @JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" @JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors" + @JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows" @JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations" @JvmField val EMAIL_ACCOUNT_BASE_URI = "$DESTINATION_BASE_URI/email_accounts" @JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups" @@ -162,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetMonitorAction(), RestDeleteMonitorAction(), RestIndexMonitorAction(), + RestIndexWorkflowAction(), RestSearchMonitorAction(settings, clusterService), RestExecuteMonitorAction(), RestExecuteWorkflowAction(), @@ -174,7 +178,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetDestinationsAction(), RestGetAlertsAction(), RestGetFindingsAction(), - RestGetWorkflowAction() + RestGetWorkflowAction(), + RestDeleteWorkflowAction() ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/workflow/WorkflowRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/workflow/WorkflowRunResult.kt deleted file mode 100644 index cc6b61745..000000000 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/workflow/WorkflowRunResult.kt +++ /dev/null @@ -1,3 +0,0 @@ -package org.opensearch.alerting.model.workflow - -data class WorkflowRunResult(private val someArg: String) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt new file mode 100644 index 000000000..0a74c27b2 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.REFRESH +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +/** + * This class consists of the REST handler to delete workflows. + */ +class RestDeleteWorkflowAction : BaseRestHandler() { + + private val log = LogManager.getLogger(javaClass) + + override fun getName(): String { + return "delete_workflow_action" + } + + override fun routes(): List { + return listOf( + RestHandler.Route( + RestRequest.Method.DELETE, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}") + + val workflowId = request.param("workflowID") + log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/$workflowId") + + val refreshPolicy = + WriteRequest.RefreshPolicy.parse(request.param(REFRESH, WriteRequest.RefreshPolicy.IMMEDIATE.value)) + val deleteWorkflowRequest = DeleteWorkflowRequest(workflowId, refreshPolicy) + + return RestChannelConsumer { channel -> + client.execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, deleteWorkflowRequest, + RestToXContentListener(channel) + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt index 9697e5f69..b3449fd70 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt @@ -19,7 +19,7 @@ import org.opensearch.rest.action.RestToXContentListener import org.opensearch.search.fetch.subphase.FetchSourceContext /** - * This class consists of the REST handler to retrieve a monitor . + * This class consists of the REST handler to retrieve a workflow . */ class RestGetWorkflowAction : BaseRestHandler() { @@ -30,14 +30,23 @@ class RestGetWorkflowAction : BaseRestHandler() { } override fun routes(): List { - return listOf() + return listOf( + RestHandler.Route( + RestRequest.Method.GET, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ), + RestHandler.Route( + RestRequest.Method.HEAD, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) } override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}") - val monitorId = request.param("workflowID") - if (monitorId == null || monitorId.isEmpty()) { + val workflowId = request.param("workflowID") + if (workflowId == null || workflowId.isEmpty()) { throw IllegalArgumentException("missing id") } @@ -46,7 +55,7 @@ class RestGetWorkflowAction : BaseRestHandler() { srcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE } val getWorkflowRequest = - GetWorkflowRequest(monitorId, RestActions.parseVersion(request), request.method(), srcContext) + GetWorkflowRequest(workflowId, RestActions.parseVersion(request), request.method(), srcContext) return RestChannelConsumer { channel -> client.execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, getWorkflowRequest, RestToXContentListener(channel)) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt new file mode 100644 index 000000000..b3b9f0d5d --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.alerting.resthandler + +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.IF_PRIMARY_TERM +import org.opensearch.alerting.util.IF_SEQ_NO +import org.opensearch.alerting.util.REFRESH +import org.opensearch.client.node.NodeClient +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestChannel +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestResponse +import org.opensearch.rest.RestStatus +import org.opensearch.rest.action.RestResponseListener +import java.io.IOException +import java.time.Instant + +/** + * Rest handlers to create and update workflows. + */ +class RestIndexWorkflowAction : BaseRestHandler() { + + override fun getName(): String { + return "index_workflow_action" + } + + override fun routes(): List { + return listOf( + RestHandler.Route(RestRequest.Method.POST, AlertingPlugin.WORKFLOW_BASE_URI), + RestHandler.Route( + RestRequest.Method.PUT, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val id = request.param("workflowID", Workflow.NO_ID) + if (request.method() == RestRequest.Method.PUT && Workflow.NO_ID == id) { + throw IllegalArgumentException("Missing workflow ID") + } + + // Validate request by parsing JSON to Monitor + val xcp = request.contentParser() + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val workflow = Workflow.parse(xcp, id).copy(lastUpdateTime = Instant.now()) + val rbacRoles = request.contentParser().map()["rbac_roles"] as List? + + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) + val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + val refreshPolicy = if (request.hasParam(REFRESH)) { + WriteRequest.RefreshPolicy.parse(request.param(REFRESH)) + } else { + WriteRequest.RefreshPolicy.IMMEDIATE + } + val workflowRequest = + IndexWorkflowRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), workflow, rbacRoles) + + return RestChannelConsumer { channel -> + client.execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, workflowRequest, indexMonitorResponse(channel, request.method())) + } + } + + private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener { + return object : RestResponseListener(channel) { + @Throws(Exception::class) + override fun buildResponse(response: IndexWorkflowResponse): RestResponse { + var returnStatus = RestStatus.CREATED + if (restMethod == RestRequest.Method.PUT) + returnStatus = RestStatus.OK + + val restResponse = + BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)) + if (returnStatus == RestStatus.CREATED) { + val location = "${AlertingPlugin.WORKFLOW_BASE_URI}/${response.id}" + restResponse.addHeader("Location", location) + } + return restResponse + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt index bd2992815..a2aba5cf7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexCompositeWorkflowAction.kt @@ -528,6 +528,9 @@ class TransportIndexCompositeWorkflowAction @Inject constructor( reqMonitorIds.remove(it.id) } if (reqMonitorIds.isNotEmpty()) { + log.error("monitorIds: " + monitorIds.joinToString()) + log.error("delegateMonitors: " + delegateMonitors.joinToString { it.id }) + log.error("reqMonitorIds: " + reqMonitorIds.joinToString()) throw AlertingException.wrap(IllegalArgumentException(("${reqMonitorIds.joinToString()} are not valid monitor ids"))) } } @@ -552,6 +555,14 @@ class TransportIndexCompositeWorkflowAction @Inject constructor( monitors.add(monitor as Monitor) } } + if (monitors.isEmpty()) { + val searchSource1 = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) + val searchRequest1 = SearchRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).source(searchSource1) + val response1: SearchResponse = client.suspendUntil { client.search(searchRequest1, it) } + + print(response1) + } + return monitors } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index d06f6fa91..03632c523 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -223,7 +223,7 @@ fun randomDocumentLevelMonitor( ) } -fun randomWorkflowMonitor( +fun randomWorkflow( id: String = Workflow.NO_ID, monitorIds: List, name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), @@ -386,6 +386,7 @@ fun randomScript(source: String = "return " + OpenSearchRestTestCase.randomBoole val ADMIN = "admin" val ALERTING_BASE_URI = "/_plugins/_alerting/monitors" +val WORKFLOW_ALERTING_BASE_URI = "/_plugins/_alerting/workflows" val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" val LEGACY_OPENDISTRO_ALERTING_BASE_URI = "/_opendistro/_alerting/monitors" val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt index 56aab4537..434e8297c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -50,7 +50,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse1 = createMonitor(monitor1)!! val monitorResponse2 = createMonitor(monitor2)!! - val workflow = randomWorkflowMonitor( + val workflow = randomWorkflow( monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) ) @@ -121,7 +121,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse1 = createMonitor(monitor1)!! val monitorResponse2 = createMonitor(monitor2)!! - val workflow = randomWorkflowMonitor( + val workflow = randomWorkflow( monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) ) @@ -146,7 +146,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse3 = createMonitor(monitor3)!! val updatedWorkflowResponse = upsertWorkflow( - randomWorkflowMonitor( + randomWorkflow( monitorIds = listOf(monitorResponse1.id, monitorResponse2.id, monitorResponse3.id) ), workflowResponse.id, @@ -226,7 +226,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse1 = createMonitor(monitor1)!! val monitorResponse2 = createMonitor(monitor2)!! - val workflow = randomWorkflowMonitor( + val workflow = randomWorkflow( monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) ) @@ -240,7 +240,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { assertNotNull(workflowById) val updatedWorkflowResponse = upsertWorkflow( - randomWorkflowMonitor( + randomWorkflow( monitorIds = listOf(monitorResponse1.id) ), workflowResponse.id, @@ -284,7 +284,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse = createMonitor(monitor)!! - val workflowRequest = randomWorkflowMonitor( + val workflowRequest = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) @@ -366,7 +366,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse = createMonitor(monitor)!! - val workflowRequest = randomWorkflowMonitor( + val workflowRequest = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflowRequest)!! @@ -403,7 +403,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse = createMonitor(monitor)!! - val workflowRequest = randomWorkflowMonitor( + val workflowRequest = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) @@ -467,7 +467,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } fun `test create workflow without delegate failure`() { - val workflow = randomWorkflowMonitor( + val workflow = randomWorkflow( monitorIds = Collections.emptyList() ) try { @@ -500,14 +500,14 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse1 = createMonitor(monitor1)!! val monitorResponse2 = createMonitor(monitor2)!! - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) ) val workflowResponse = upsertWorkflow(workflow)!! assertNotNull("Workflow creation failed", workflowResponse) - workflow = randomWorkflowMonitor( + workflow = randomWorkflow( id = workflowResponse.id, monitorIds = Collections.emptyList() ) @@ -524,7 +524,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } fun `test create workflow duplicate delegate failure`() { - val workflow = randomWorkflowMonitor( + val workflow = randomWorkflow( monitorIds = listOf("1", "1", "2") ) try { @@ -551,14 +551,14 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorResponse = createMonitor(monitor)!! - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! assertNotNull("Workflow creation failed", workflowResponse) - workflow = randomWorkflowMonitor( + workflow = randomWorkflow( id = workflowResponse.id, monitorIds = listOf("1", "1", "2") ) @@ -586,7 +586,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { ) val monitorResponse = createMonitor(monitor)!! - val workflow = randomWorkflowMonitor( + val workflow = randomWorkflow( monitorIds = listOf("-1", monitorResponse.id) ) try { @@ -613,13 +613,13 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { ) val monitorResponse = createMonitor(monitor)!! - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! assertNotNull("Workflow creation failed", workflowResponse) - workflow = randomWorkflowMonitor( + workflow = randomWorkflow( id = workflowResponse.id, monitorIds = listOf("-1", monitorResponse.id) ) @@ -669,7 +669,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { ) val monitorResponse = createMonitor(monitor)!! - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! @@ -730,7 +730,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { ) val monitorResponse = createMonitor(monitor)!! - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! @@ -792,7 +792,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { ) val monitorResponse = createMonitor(monitor)!! - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRestTestCase.kt index 9d0351ee5..52d96d817 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRestTestCase.kt @@ -1,19 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.alerting import org.apache.http.HttpEntity +import org.apache.http.HttpHeaders import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity +import org.apache.http.message.BasicHeader import org.opensearch.client.RestClient import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.alerting.util.string import org.opensearch.rest.RestStatus -class WorkflowRestTestCase : AlertingRestTestCase() { +open class WorkflowRestTestCase : AlertingRestTestCase() { + + protected fun createRandomWorkflow(refresh: Boolean = false, monitorIds: List): Workflow { + val workflow = randomWorkflow(monitorIds = monitorIds) + return createWorkflow(workflow, refresh) + } + private fun createWorkflowEntityWithBackendRoles(workflow: Workflow, rbacRoles: List?): HttpEntity { if (rbacRoles == null) { return workflow.toHttpEntity() @@ -30,22 +46,23 @@ class WorkflowRestTestCase : AlertingRestTestCase() { workflow: Workflow, rbacRoles: List? = null, refresh: Boolean = true - ) { + ): Workflow { val response = client.makeRequest( "POST", "$WORKFLOW_ALERTING_BASE_URI?refresh=$refresh", emptyMap(), createWorkflowEntityWithBackendRoles(workflow, rbacRoles) ) assertEquals("Unable to create a new monitor", RestStatus.CREATED, response.restStatus()) - val monitorJson = JsonXContent.jsonXContent.createParser( + val workflowJson = JsonXContent.jsonXContent.createParser( NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.entity.content ).map() - assertUserNull(monitorJson as HashMap) + assertUserNull(workflowJson as HashMap) + return workflow.copy(id = workflowJson["_id"] as String) } - protected fun createWorkflow(workflow: Workflow, refresh: Boolean = true) { - createWorkflowWithClient(client(), workflow, emptyList(), refresh) + protected fun createWorkflow(workflow: Workflow, refresh: Boolean = true): Workflow { + return createWorkflowWithClient(client(), workflow, emptyList(), refresh) } protected fun Workflow.toHttpEntity(): HttpEntity { @@ -57,4 +74,30 @@ class WorkflowRestTestCase : AlertingRestTestCase() { return shuffleXContent(toXContent(builder, ToXContent.EMPTY_PARAMS)).string() } -} \ No newline at end of file + protected fun getWorkflow(workflowId: String, header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")): Workflow { + val response = client().makeRequest("GET", "$WORKFLOW_ALERTING_BASE_URI/$workflowId", null, header) + assertEquals("Unable to get workflow $workflowId", RestStatus.OK, response.restStatus()) + + val parser = createParser(XContentType.JSON.xContent(), response.entity.content) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser) + + lateinit var id: String + var version: Long = 0 + lateinit var workflow: Workflow + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + parser.nextToken() + + when (parser.currentName()) { + "_id" -> id = parser.text() + "_version" -> version = parser.longValue() + "workflow" -> workflow = Workflow.parse(parser) + } + } + + assertUserNull(workflow) + return workflow.copy(id = id, version = version) + } + + protected fun Workflow.relativeUrl() = "$WORKFLOW_ALERTING_BASE_URI/$id" +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt index e7f4d807e..770b0e950 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt @@ -70,7 +70,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { val monitorResponse2 = createMonitor(monitor2)!! - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(monitorResponse.id, monitorResponse2.id) ) val workflowResponse = upsertWorkflow(workflow)!! @@ -188,7 +188,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { val docLevelMonitorResponse = createMonitor(docLevelMonitor)!! // 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor) - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! @@ -325,7 +325,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { val queryMonitorResponse = createMonitor(randomQueryLevelMonitor(inputs = listOf(queryMonitorInput), triggers = listOf(queryLevelTrigger)))!! // 1. docMonitor (chainedFinding = null) 2. bucketMonitor (chainedFinding = docMonitor) 3. docMonitor (chainedFinding = bucketMonitor) 4. queryMonitor (chainedFinding = docMonitor 3) - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(docLevelMonitorResponse.id, bucketLevelMonitorResponse.id, docLevelMonitorResponse1.id, queryMonitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! @@ -419,7 +419,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { ) val monitorResponse = createMonitor(monitor)!! - var workflow = randomWorkflowMonitor( + var workflow = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflow)!! @@ -449,7 +449,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { val monitorResponse = createMonitor(monitor)!! - val workflowRequest = randomWorkflowMonitor( + val workflowRequest = randomWorkflow( monitorIds = listOf(monitorResponse.id) ) val workflowResponse = upsertWorkflow(workflowRequest)!! diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 61594b35a..9b86ba9a4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -5,30 +5,280 @@ package org.opensearch.alerting.resthandler -import org.opensearch.alerting.ALERTING_BASE_URI -import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.ALWAYS_RUN +import org.opensearch.alerting.WORKFLOW_ALERTING_BASE_URI +import org.opensearch.alerting.WorkflowRestTestCase import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.randomBucketLevelMonitor +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger +import org.opensearch.alerting.randomQueryLevelMonitor +import org.opensearch.alerting.randomWorkflow +import org.opensearch.alerting.randomWorkflowMonitorWithDelegates import org.opensearch.client.ResponseException +import org.opensearch.commons.alerting.model.ChainedFindings +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.rest.RestStatus import org.opensearch.test.junit.annotations.TestLogging +import java.util.Collections @TestLogging("level:DEBUG", reason = "Debug for tests.") @Suppress("UNCHECKED_CAST") -class WorkflowRestApiIT : AlertingRestTestCase() { +class WorkflowRestApiIT : WorkflowRestTestCase() { + + fun `test create workflow success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + } + + fun `test create workflow with different monitor types success`() { + val index = createTestIndex() + val docQuery = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docLevelMonitorResponse = createMonitor(monitor) + + val bucketLevelMonitor = randomBucketLevelMonitor() + val bucketLevelMonitorResponse = createMonitor(bucketLevelMonitor) + + val workflow = randomWorkflow( + monitorIds = listOf(docLevelMonitorResponse.id, bucketLevelMonitorResponse.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + val workflowById = getWorkflow(createdId) + assertNotNull(workflowById) + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", workflow.inputs, workflowById.inputs) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", docLevelMonitorResponse.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", bucketLevelMonitorResponse.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", docLevelMonitorResponse.id, delegate2.chainedFindings!!.monitorId + ) + } + + fun `test create workflow without delegate failure`() { + val workflow = randomWorkflow( + monitorIds = Collections.emptyList() + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be empty.") + ) + } + } + } + + fun `test create workflow duplicate delegate failure`() { + val workflow = randomWorkflow( + monitorIds = listOf("1", "1", "2") + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Duplicate delegates not allowed") + ) + } + } + } + + fun `test create workflow delegate monitor doesn't exist failure`() { + val index = createTestIndex() + val docQuery = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docLevelMonitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf("-1", docLevelMonitorResponse.id) + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("are not valid monitor ids") + ) + } + } + } + + fun `test create workflow sequence order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + val workflow = randomWorkflowMonitorWithDelegates( + delegates = delegates + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values") + ) + } + } + } + + fun `test create workflow chained findings monitor not in sequence failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedFindings("monitor-x")) + ) + val workflow = randomWorkflowMonitorWithDelegates( + delegates = delegates + ) + + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-x doesn't exist in sequence") + ) + } + } + } + + fun `test create workflow chained findings order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedFindings("monitor-2")) + ) + val workflow = randomWorkflowMonitorWithDelegates( + delegates = delegates + ) + + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3") + ) + } + } + } @Throws(Exception::class) fun `test getting a workflow`() { - val workflow = createRandomMonitor() + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + val storedMonitor = getMonitor(monitor.id) + + assertEquals("Indexed and retrieved monitor differ", monitor, storedMonitor) - val storedMonitor = getMonitor(workflow.id) + val workflow = createRandomWorkflow(monitorIds = listOf(monitor.id)) - assertEquals("Indexed and retrieved monitor differ", workflow, storedMonitor) + val storedWorkflow = getWorkflow(workflow.id) + + assertEquals("Indexed and retrieved workflow differ", workflow.id, storedWorkflow.id) + val delegates = (storedWorkflow.inputs[0] as CompositeInput).sequence.delegates + assertEquals("Delegate list not correct", 1, delegates.size) + assertEquals("Delegate order id not correct", 1, delegates[0].order) + assertEquals("Delegate id list not correct", monitor.id, delegates[0].monitorId) } @Throws(Exception::class) - fun `test getting a monitor that doesn't exist`() { + fun `test getting a workflow that doesn't exist`() { try { - getMonitor(randomAlphaOfLength(20)) + getWorkflow(randomAlphaOfLength(20)) fail("expected response exception") } catch (e: ResponseException) { assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) @@ -36,16 +286,52 @@ class WorkflowRestApiIT : AlertingRestTestCase() { } @Throws(Exception::class) - fun `test checking if a monitor exists`() { - val monitor = createRandomMonitor() + fun `test checking if a workflow exists`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + // val monitor = createMonitor(docLevelMonitor) + val storedMonitor = getMonitor(monitor.id) + assertEquals("Indexed and retrieved monitor differ", monitor, storedMonitor) + val workflow = createRandomWorkflow(monitorIds = listOf(monitor.id)) - val headResponse = client().makeRequest("HEAD", monitor.relativeUrl()) - assertEquals("Unable to HEAD monitor", RestStatus.OK, headResponse.restStatus()) - assertNull("Response contains unexpected body", headResponse.entity) + val headResponse = client().makeRequest("HEAD", workflow.relativeUrl()) + assertEquals("Unable to HEAD workflow", RestStatus.OK, headResponse.restStatus()) + assertNull("Workflow response contains unexpected body", headResponse.entity) } - fun `test checking if a non-existent monitor exists`() { - val headResponse = client().makeRequest("HEAD", "$ALERTING_BASE_URI/foobarbaz") + fun `test checking if a non-existent workflow exists`() { + val headResponse = client().makeRequest("HEAD", "$WORKFLOW_ALERTING_BASE_URI/foobarbaz") assertEquals("Unexpected status", RestStatus.NOT_FOUND, headResponse.restStatus()) } -} \ No newline at end of file + + fun `test delete workflow`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + val workflowResponse = createWorkflow(workflowRequest) + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflow(workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + client().makeRequest("DELETE", getWorkflowResponse.relativeUrl()) + + // Verify that the workflow is deleted + try { + getWorkflow(workflowId) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } +}