diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index ff52d69e8..25add3e16 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -27,6 +27,7 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting import org.opensearch.alerting.core.settings.ScheduledJobSettings import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction import org.opensearch.alerting.resthandler.RestDeleteMonitorAction +import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction import org.opensearch.alerting.resthandler.RestExecuteMonitorAction import org.opensearch.alerting.resthandler.RestExecuteWorkflowAction import org.opensearch.alerting.resthandler.RestGetAlertsAction @@ -35,7 +36,9 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction import org.opensearch.alerting.resthandler.RestGetEmailGroupAction import org.opensearch.alerting.resthandler.RestGetFindingsAction import org.opensearch.alerting.resthandler.RestGetMonitorAction +import org.opensearch.alerting.resthandler.RestGetWorkflowAction import org.opensearch.alerting.resthandler.RestIndexMonitorAction +import org.opensearch.alerting.resthandler.RestIndexWorkflowAction import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction import org.opensearch.alerting.resthandler.RestSearchMonitorAction @@ -128,6 +131,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R @JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows" @JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" @JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors" + @JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows" @JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations" @JvmField val EMAIL_ACCOUNT_BASE_URI = "$DESTINATION_BASE_URI/email_accounts" @JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups" @@ -161,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetMonitorAction(), RestDeleteMonitorAction(), RestIndexMonitorAction(), + RestIndexWorkflowAction(), RestSearchMonitorAction(settings, clusterService), RestExecuteMonitorAction(), RestExecuteWorkflowAction(), @@ -172,7 +177,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetEmailGroupAction(), RestGetDestinationsAction(), RestGetAlertsAction(), - RestGetFindingsAction() + RestGetFindingsAction(), + RestGetWorkflowAction(), + RestDeleteWorkflowAction() ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt new file mode 100644 index 000000000..92b9d5c61 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.REFRESH +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +/** + * This class consists of the REST handler to delete workflows. + */ +class RestDeleteWorkflowAction : BaseRestHandler() { + + private val log = LogManager.getLogger(javaClass) + + override fun getName(): String { + return "delete_workflow_action" + } + + override fun routes(): List { + return listOf( + RestHandler.Route( + RestRequest.Method.DELETE, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}") + + val workflowId = request.param("workflowID") + val deleteDelegateMonitors = request.paramAsBoolean("deleteDelegateMonitors", false) + log.debug("${request.method()} ${request.uri()}") + + val refreshPolicy = + WriteRequest.RefreshPolicy.parse(request.param(REFRESH, WriteRequest.RefreshPolicy.IMMEDIATE.value)) + val deleteWorkflowRequest = DeleteWorkflowRequest(workflowId, deleteDelegateMonitors, refreshPolicy) + + return RestChannelConsumer { channel -> + client.execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, deleteWorkflowRequest, + RestToXContentListener(channel) + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt new file mode 100644 index 000000000..b3449fd70 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.context +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestActions +import org.opensearch.rest.action.RestToXContentListener +import org.opensearch.search.fetch.subphase.FetchSourceContext + +/** + * This class consists of the REST handler to retrieve a workflow . + */ +class RestGetWorkflowAction : BaseRestHandler() { + + private val log = LogManager.getLogger(javaClass) + + override fun getName(): String { + return "get_workflow_action" + } + + override fun routes(): List { + return listOf( + RestHandler.Route( + RestRequest.Method.GET, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ), + RestHandler.Route( + RestRequest.Method.HEAD, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) + } + + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}") + + val workflowId = request.param("workflowID") + if (workflowId == null || workflowId.isEmpty()) { + throw IllegalArgumentException("missing id") + } + + var srcContext = context(request) + if (request.method() == RestRequest.Method.HEAD) { + srcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE + } + val getWorkflowRequest = + GetWorkflowRequest(workflowId, RestActions.parseVersion(request), request.method(), srcContext) + return RestChannelConsumer { + channel -> + client.execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, getWorkflowRequest, RestToXContentListener(channel)) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt new file mode 100644 index 000000000..1d6831dc9 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.alerting.resthandler + +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IF_PRIMARY_TERM +import org.opensearch.alerting.util.IF_SEQ_NO +import org.opensearch.alerting.util.REFRESH +import org.opensearch.client.node.NodeClient +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestChannel +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestResponse +import org.opensearch.rest.RestStatus +import org.opensearch.rest.action.RestResponseListener +import java.io.IOException +import java.time.Instant + +/** + * Rest handlers to create and update workflows. + */ +class RestIndexWorkflowAction : BaseRestHandler() { + + override fun getName(): String { + return "index_workflow_action" + } + + override fun routes(): List { + return listOf( + RestHandler.Route(RestRequest.Method.POST, AlertingPlugin.WORKFLOW_BASE_URI), + RestHandler.Route( + RestRequest.Method.PUT, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val id = request.param("workflowID", Workflow.NO_ID) + if (request.method() == RestRequest.Method.PUT && Workflow.NO_ID == id) { + throw AlertingException.wrap(IllegalArgumentException("Missing workflow ID")) + } + + // Validate request by parsing JSON to Monitor + val xcp = request.contentParser() + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val workflow = Workflow.parse(xcp, id).copy(lastUpdateTime = Instant.now()) + val rbacRoles = request.contentParser().map()["rbac_roles"] as List? + + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) + val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + val refreshPolicy = if (request.hasParam(REFRESH)) { + WriteRequest.RefreshPolicy.parse(request.param(REFRESH)) + } else { + WriteRequest.RefreshPolicy.IMMEDIATE + } + val workflowRequest = + IndexWorkflowRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), workflow, rbacRoles) + + return RestChannelConsumer { channel -> + client.execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, workflowRequest, indexMonitorResponse(channel, request.method())) + } + } + + private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener { + return object : RestResponseListener(channel) { + @Throws(Exception::class) + override fun buildResponse(response: IndexWorkflowResponse): RestResponse { + var returnStatus = RestStatus.CREATED + if (restMethod == RestRequest.Method.PUT) + returnStatus = RestStatus.OK + + val restResponse = + BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)) + if (returnStatus == RestStatus.CREATED) { + val location = "${AlertingPlugin.WORKFLOW_BASE_URI}/${response.id}" + restResponse.addHeader("Location", location) + } + return restResponse + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index a4fa9ce76..3e453acde 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchException import org.opensearch.OpenSearchStatusException import org.opensearch.ResourceAlreadyExistsException @@ -206,7 +207,7 @@ class TransportIndexWorkflowAction @Inject constructor( override fun onFailure(t: Exception) { // https://github.com/opensearch-project/alerting/issues/646 - if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { scope.launch { // Wait for the yellow status val request = ClusterHealthRequest() @@ -321,7 +322,6 @@ class TransportIndexWorkflowAction @Inject constructor( } private suspend fun indexWorkflow() { - if (user != null) { // Use the backend roles which is an intersection of the requested backend roles and the user's backend roles. // Admins can pass in any backend role. Also if no backend role is passed in, all the user's backend roles are used. @@ -555,6 +555,9 @@ class TransportIndexWorkflowAction @Inject constructor( reqMonitorIds.remove(it.id) } if (reqMonitorIds.isNotEmpty()) { + log.error("monitorIds: " + monitorIds.joinToString()) + log.error("delegateMonitors: " + delegateMonitors.joinToString { it.id }) + log.error("reqMonitorIds: " + reqMonitorIds.joinToString()) throw AlertingException.wrap(IllegalArgumentException(("${reqMonitorIds.joinToString()} are not valid monitor ids"))) } } @@ -579,6 +582,14 @@ class TransportIndexWorkflowAction @Inject constructor( monitors.add(monitor as Monitor) } } + if (monitors.isEmpty()) { + val searchSource1 = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) + val searchRequest1 = SearchRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).source(searchSource1) + val response1: SearchResponse = client.suspendUntil { client.search(searchRequest1, it) } + + print(response1) + } + return monitors } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 0723fd554..f08713d8f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -386,6 +386,7 @@ fun randomScript(source: String = "return " + OpenSearchRestTestCase.randomBoole val ADMIN = "admin" val ALERTING_BASE_URI = "/_plugins/_alerting/monitors" +val WORKFLOW_ALERTING_BASE_URI = "/_plugins/_alerting/workflows" val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" val LEGACY_OPENDISTRO_ALERTING_BASE_URI = "/_opendistro/_alerting/monitors" val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRestTestCase.kt new file mode 100644 index 000000000..52d96d817 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRestTestCase.kt @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.apache.http.HttpEntity +import org.apache.http.HttpHeaders +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.message.BasicHeader +import org.opensearch.client.RestClient +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.string +import org.opensearch.rest.RestStatus + +open class WorkflowRestTestCase : AlertingRestTestCase() { + + protected fun createRandomWorkflow(refresh: Boolean = false, monitorIds: List): Workflow { + val workflow = randomWorkflow(monitorIds = monitorIds) + return createWorkflow(workflow, refresh) + } + + private fun createWorkflowEntityWithBackendRoles(workflow: Workflow, rbacRoles: List?): HttpEntity { + if (rbacRoles == null) { + return workflow.toHttpEntity() + } + val temp = workflow.toJsonString() + val toReplace = temp.lastIndexOf("}") + val rbacString = rbacRoles.joinToString { "\"$it\"" } + val jsonString = temp.substring(0, toReplace) + ", \"rbac_roles\": [$rbacString] }" + return StringEntity(jsonString, ContentType.APPLICATION_JSON) + } + + protected fun createWorkflowWithClient( + client: RestClient, + workflow: Workflow, + rbacRoles: List? = null, + refresh: Boolean = true + ): Workflow { + val response = client.makeRequest( + "POST", "$WORKFLOW_ALERTING_BASE_URI?refresh=$refresh", emptyMap(), + createWorkflowEntityWithBackendRoles(workflow, rbacRoles) + ) + assertEquals("Unable to create a new monitor", RestStatus.CREATED, response.restStatus()) + + val workflowJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, + response.entity.content + ).map() + assertUserNull(workflowJson as HashMap) + return workflow.copy(id = workflowJson["_id"] as String) + } + + protected fun createWorkflow(workflow: Workflow, refresh: Boolean = true): Workflow { + return createWorkflowWithClient(client(), workflow, emptyList(), refresh) + } + + protected fun Workflow.toHttpEntity(): HttpEntity { + return StringEntity(toJsonString(), ContentType.APPLICATION_JSON) + } + + private fun Workflow.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return shuffleXContent(toXContent(builder, ToXContent.EMPTY_PARAMS)).string() + } + + protected fun getWorkflow(workflowId: String, header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")): Workflow { + val response = client().makeRequest("GET", "$WORKFLOW_ALERTING_BASE_URI/$workflowId", null, header) + assertEquals("Unable to get workflow $workflowId", RestStatus.OK, response.restStatus()) + + val parser = createParser(XContentType.JSON.xContent(), response.entity.content) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser) + + lateinit var id: String + var version: Long = 0 + lateinit var workflow: Workflow + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + parser.nextToken() + + when (parser.currentName()) { + "_id" -> id = parser.text() + "_version" -> version = parser.longValue() + "workflow" -> workflow = Workflow.parse(parser) + } + } + + assertUserNull(workflow) + return workflow.copy(id = id, version = version) + } + + protected fun Workflow.relativeUrl() = "$WORKFLOW_ALERTING_BASE_URI/$id" +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt new file mode 100644 index 000000000..8da95822e --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -0,0 +1,571 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.opensearch.alerting.ALWAYS_RUN +import org.opensearch.alerting.WORKFLOW_ALERTING_BASE_URI +import org.opensearch.alerting.WorkflowRestTestCase +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.randomBucketLevelMonitor +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger +import org.opensearch.alerting.randomQueryLevelMonitor +import org.opensearch.alerting.randomWorkflow +import org.opensearch.alerting.randomWorkflowWithDelegates +import org.opensearch.client.ResponseException +import org.opensearch.commons.alerting.model.ChainedFindings +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.rest.RestStatus +import org.opensearch.test.junit.annotations.TestLogging +import java.util.Collections + +@TestLogging("level:DEBUG", reason = "Debug for tests.") +@Suppress("UNCHECKED_CAST") +class WorkflowRestApiIT : WorkflowRestTestCase() { + + fun `test create workflow success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + } + + fun `test create workflow with different monitor types success`() { + val index = createTestIndex() + val docQuery = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docLevelMonitorResponse = createMonitor(monitor) + + val bucketLevelMonitor = randomBucketLevelMonitor() + val bucketLevelMonitorResponse = createMonitor(bucketLevelMonitor) + + val workflow = randomWorkflow( + monitorIds = listOf(docLevelMonitorResponse.id, bucketLevelMonitorResponse.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + val workflowById = getWorkflow(createdId) + assertNotNull(workflowById) + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", workflow.inputs, workflowById.inputs) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", docLevelMonitorResponse.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", bucketLevelMonitorResponse.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", docLevelMonitorResponse.id, delegate2.chainedFindings!!.monitorId + ) + } + + fun `test create workflow without delegate failure`() { + val workflow = randomWorkflow( + monitorIds = Collections.emptyList() + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be empty.") + ) + } + } + } + + fun `test create workflow duplicate delegate failure`() { + val workflow = randomWorkflow( + monitorIds = listOf("1", "1", "2") + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Duplicate delegates not allowed") + ) + } + } + } + + fun `test create workflow delegate monitor doesn't exist failure`() { + val index = createTestIndex() + val docQuery = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docLevelMonitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf("-1", docLevelMonitorResponse.id) + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("are not valid monitor ids") + ) + } + } + } + + fun `test create workflow sequence order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values") + ) + } + } + } + + fun `test create workflow chained findings monitor not in sequence failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedFindings("monitor-x")) + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-x doesn't exist in sequence") + ) + } + } + } + + fun `test create workflow chained findings order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedFindings("monitor-2")) + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3") + ) + } + } + } + + fun `test update workflow add monitor success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse2 = createMonitor(monitor2) + + val updatedWorkflow = randomWorkflow( + id = createdId, + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + + val updateResponse = client().makeRequest("PUT", updatedWorkflow.relativeUrl(), emptyMap(), updatedWorkflow.toHttpEntity()) + + assertEquals("Update workflow failed", RestStatus.OK, updateResponse.restStatus()) + + val updateResponseBody = updateResponse.asMap() + val updatedId = updateResponseBody["_id"] as String + val updatedVersion = updateResponseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, updatedId) + assertTrue("incorrect version", updatedVersion > 0) + + val workflowById = getWorkflow(updatedId) + assertNotNull(workflowById) + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", monitorResponse2.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", monitorResponse.id, delegate2.chainedFindings!!.monitorId + ) + } + + fun `test update workflow remove monitor success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse2 = createMonitor(monitor2) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + var workflowById = getWorkflow(createdId) + assertNotNull(workflowById) + // Delegate verification + @Suppress("UNCHECKED_CAST") + var delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val updatedWorkflow = randomWorkflow( + id = createdId, + monitorIds = listOf(monitorResponse.id) + ) + + val updateResponse = client().makeRequest("PUT", updatedWorkflow.relativeUrl(), emptyMap(), updatedWorkflow.toHttpEntity()) + + assertEquals("Update workflow failed", RestStatus.OK, updateResponse.restStatus()) + + val updateResponseBody = updateResponse.asMap() + val updatedId = updateResponseBody["_id"] as String + val updatedVersion = updateResponseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, updatedId) + assertTrue("incorrect version", updatedVersion > 0) + + workflowById = getWorkflow(updatedId) + assertNotNull(workflowById) + // Delegate verification + @Suppress("UNCHECKED_CAST") + delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 1, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse.id, delegate1.monitorId) + } + + @Throws(Exception::class) + fun `test getting a workflow`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + val storedMonitor = getMonitor(monitor.id) + + assertEquals("Indexed and retrieved monitor differ", monitor, storedMonitor) + + val workflow = createRandomWorkflow(monitorIds = listOf(monitor.id)) + + val storedWorkflow = getWorkflow(workflow.id) + + assertEquals("Indexed and retrieved workflow differ", workflow.id, storedWorkflow.id) + val delegates = (storedWorkflow.inputs[0] as CompositeInput).sequence.delegates + assertEquals("Delegate list not correct", 1, delegates.size) + assertEquals("Delegate order id not correct", 1, delegates[0].order) + assertEquals("Delegate id list not correct", monitor.id, delegates[0].monitorId) + } + + @Throws(Exception::class) + fun `test getting a workflow that doesn't exist`() { + try { + getWorkflow(randomAlphaOfLength(20)) + fail("expected response exception") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + @Throws(Exception::class) + fun `test checking if a workflow exists`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + // val monitor = createMonitor(docLevelMonitor) + val storedMonitor = getMonitor(monitor.id) + assertEquals("Indexed and retrieved monitor differ", monitor, storedMonitor) + val workflow = createRandomWorkflow(monitorIds = listOf(monitor.id)) + + val headResponse = client().makeRequest("HEAD", workflow.relativeUrl()) + assertEquals("Unable to HEAD workflow", RestStatus.OK, headResponse.restStatus()) + assertNull("Workflow response contains unexpected body", headResponse.entity) + } + + fun `test checking if a non-existent workflow exists`() { + val headResponse = client().makeRequest("HEAD", "$WORKFLOW_ALERTING_BASE_URI/foobarbaz") + assertEquals("Unexpected status", RestStatus.NOT_FOUND, headResponse.restStatus()) + } + + fun `test delete workflow`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + val workflowResponse = createWorkflow(workflowRequest) + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflow(workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + client().makeRequest("DELETE", getWorkflowResponse.relativeUrl()) + + // Verify that the workflow is deleted + try { + getWorkflow(workflowId) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } + + fun `test delete workflow delete delegate monitors`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + val workflowResponse = createWorkflow(workflowRequest) + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflow(workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + client().makeRequest("DELETE", getWorkflowResponse.relativeUrl().plus("?deleteDelegateMonitors=true")) + + // Verify that the workflow is deleted + try { + getWorkflow(workflowId) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + + // Verify that delegate monitor is deleted + try { + getMonitor(monitor.id) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Monitor not found.") + ) + } + } + } + + fun `test delete workflow preserve delegate monitors`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + val workflowResponse = createWorkflow(workflowRequest) + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflow(workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + client().makeRequest("DELETE", getWorkflowResponse.relativeUrl().plus("?deleteDelegateMonitors=false")) + + // Verify that the workflow is deleted + try { + getWorkflow(workflowId) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + + // Verify that delegate monitor is not deleted + val delegateMonitor = getMonitor(monitor.id) + assertNotNull(delegateMonitor) + } + + @Throws(Exception::class) + fun `test deleting a workflow that doesn't exist`() { + try { + client().makeRequest("DELETE", "$WORKFLOW_ALERTING_BASE_URI/foobarbaz") + fail("expected 404 ResponseException") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } +}