From 06b427cf1a678b8962897485d3dca421fee9f5c8 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 21 Feb 2023 12:40:58 -0800 Subject: [PATCH] Multiple indices support in DocLevelMonitorInput (#784) (#808) Signed-off-by: Petar Dzepina --- .../org/opensearch/alerting/AlertingPlugin.kt | 1 + .../alerting/DocumentLevelMonitorRunner.kt | 41 ++-- .../alerting/MonitorRunnerExecutionContext.kt | 2 + .../alerting/MonitorRunnerService.kt | 6 + .../alerting/alerts/AlertIndices.kt | 10 +- .../transport/TransportIndexMonitorAction.kt | 3 +- .../alerting/util/DocLevelMonitorQueries.kt | 30 ++- .../opensearch/alerting/util/IndexUtils.kt | 25 ++ .../alerting/DocumentMonitorRunnerIT.kt | 32 +-- .../alerting/MonitorDataSourcesIT.kt | 214 ++++++++++++++++++ 10 files changed, 304 insertions(+), 60 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 6cce97a21..b5870f21b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -217,6 +217,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerClusterService(clusterService) .registerClient(client) .registerNamedXContentRegistry(xContentRegistry) + .registerindexNameExpressionResolver(indexNameExpressionResolver) .registerScriptService(scriptService) .registerSettings(settings) .registerThreadPool(threadPool) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index b7a9e7478..d89590f02 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -7,8 +7,6 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException -import org.opensearch.action.admin.indices.get.GetIndexRequest -import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction @@ -23,9 +21,11 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.bytes.BytesReference @@ -92,7 +92,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput - val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() @@ -105,6 +105,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val docsToQueries = mutableMapOf>() try { + // Resolve all passed indices to concrete indices + val indices = IndexUtils.resolveAllIndices( + docLevelMonitorInput.indices, + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( monitor = monitor, @@ -113,12 +120,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexTimeout = monitorCtx.indexTimeout!! ) - val getIndexRequest = GetIndexRequest().indices(index) - val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it) - } - val indices = getIndexResponse.indices() - // cleanup old indices that are not monitored anymore from the same monitor for (ind in updatedLastRunContext.keys) { if (!indices.contains(ind)) { @@ -129,8 +130,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indices.forEach { indexName -> // Prepare lastRunContext for each index val indexLastRunContext = lastRunContext.getOrPut(indexName) { - val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse) - MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently) + val isIndexCreatedRecently = createdRecently( + monitor, + periodStart, + periodEnd, + monitorCtx.clusterService!!.state().metadata.index(indexName) + ) + MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently) } // Prepare updatedLastRunContext for each index @@ -180,7 +186,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) } catch (e: Exception) { - logger.error("Failed to start Document-level-monitor $index. Error: ${e.message}", e) + logger.error("Failed to start Document-level-monitor ${monitor.name}. Error: ${e.message}", e) val alertingException = AlertingException.wrap(e) monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) } @@ -379,9 +385,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { throw IOException("Invalid input with document-level-monitor.") } - val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput - if (docLevelMonitorInput.indices.size > 1) { - throw IOException("Only one index is supported with document-level-monitor.") + if ((monitor.inputs[0] as DocLevelMonitorInput).indices.isEmpty()) { + throw IllegalArgumentException("DocLevelMonitorInput has no indices") } } @@ -408,13 +413,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // new index is monitored from the beginning of that index private fun createdRecently( monitor: Monitor, - index: String, periodStart: Instant, periodEnd: Instant, - getIndexResponse: GetIndexResponse + indexMetadata: IndexMetadata ): Boolean { val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart - return getIndexResponse.settings.get(index).getAsLong("index.creation_date", 0L) > lastExecutionTime.toEpochMilli() + val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L + return indexCreationDate > lastExecutionTime.toEpochMilli() } /** diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 55624d66e..25bb42fc9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -25,6 +26,7 @@ data class MonitorRunnerExecutionContext( var clusterService: ClusterService? = null, var client: Client? = null, var xContentRegistry: NamedXContentRegistry? = null, + var indexNameExpressionResolver: IndexNameExpressionResolver? = null, var scriptService: ScriptService? = null, var settings: Settings? = null, var threadPool: ThreadPool? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 471fc1c83..d03db678f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -31,6 +31,7 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestin import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.isDocLevelMonitor import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.component.AbstractLifecycleComponent import org.opensearch.common.settings.Settings @@ -71,6 +72,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerindexNameExpressionResolver(indexNameExpressionResolver: IndexNameExpressionResolver): MonitorRunnerService { + this.monitorCtx.indexNameExpressionResolver = indexNameExpressionResolver + return this + } + fun registerScriptService(scriptService: ScriptService): MonitorRunnerService { this.monitorCtx.scriptService = scriptService return this diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index 45d2cd9b2..176b6d1d4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting.alerts import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.ActionListener import org.opensearch.action.admin.cluster.state.ClusterStateRequest @@ -36,6 +37,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTO import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_RETENTION_PERIOD import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT +import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.client.Client import org.opensearch.cluster.ClusterChangedEvent @@ -357,8 +359,12 @@ class AlertIndices( return try { val createIndexResponse: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) } createIndexResponse.isAcknowledged - } catch (e: ResourceAlreadyExistsException) { - true + } catch (t: Exception) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { + true + } else { + throw AlertingException.wrap(t) + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 5d20b51e4..b9d6ecd85 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchException import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException @@ -303,7 +304,7 @@ class TransportIndexMonitorAction @Inject constructor( } override fun onFailure(t: Exception) { // https://github.com/opensearch-project/alerting/issues/646 - if (t is ResourceAlreadyExistsException && t.message?.contains("already exists") == true) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { scope.launch { // Wait for the yellow status val request = ClusterHealthRequest() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 29666bd2b..bc981407e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -13,8 +13,6 @@ import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.delete.DeleteIndexRequest -import org.opensearch.action.admin.indices.get.GetIndexRequest -import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse @@ -26,6 +24,7 @@ import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.MonitorRunnerService.monitorCtx import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client @@ -86,8 +85,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return try { val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) } createIndexResponse.isAcknowledged - } catch (t: ResourceAlreadyExistsException) { - if (t.message?.contains("already exists") == true) { + } catch (t: Exception) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { true } else { throw t @@ -107,9 +106,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ admin().indices().delete(DeleteIndexRequest(dataSources.queryIndex), it) } if (!acknowledgedResponse.isAcknowledged) { - val errorMessage = "Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!" - log.error(errorMessage) - throw AlertingException.wrap(OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR)) + log.warn("Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!") } } val alias = dataSources.queryIndex @@ -125,8 +122,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return try { val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) } createIndexResponse.isAcknowledged - } catch (t: ResourceAlreadyExistsException) { - if (t.message?.contains("already exists") == true) { + } catch (t: Exception) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { true } else { throw t @@ -206,16 +203,15 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ indexTimeout: TimeValue ) { val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput - val index = docLevelMonitorInput.indices[0] val queries: List = docLevelMonitorInput.queries - val clusterState = clusterService.state() + val indices = IndexUtils.resolveAllIndices( + docLevelMonitorInput.indices, + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) - val getIndexRequest = GetIndexRequest().indices(index) - val getIndexResponse: GetIndexResponse = client.suspendUntil { - client.admin().indices().getIndex(getIndexRequest, it) - } - val indices = getIndexResponse.indices() + val clusterState = clusterService.state() // Run through each backing index and apply appropriate mappings to query index indices?.forEach { indexName -> @@ -387,7 +383,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ /** * Adjusts max field limit index setting for query index if source index has higher limit. - * This will prevent max field limit exception, when applying mappings to query index + * This will prevent max field limit exception, when source index has more fields then query index limit */ private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) { val getSettingsResponse: GetSettingsResponse = client.suspendUntil { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index b24962aa5..a2770bc7a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -7,17 +7,21 @@ package org.opensearch.alerting.util import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.metadata.IndexNameExpressionResolver +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.util.IndexUtils +import org.opensearch.index.IndexNotFoundException class IndexUtils { @@ -130,5 +134,26 @@ class IndexUtils { } } } + + @JvmStatic + fun resolveAllIndices(indices: List, clusterService: ClusterService, resolver: IndexNameExpressionResolver): List { + val result = mutableListOf() + + indices.forEach { index -> + val concreteIndices = resolver.concreteIndexNames( + clusterService.state(), + IndicesOptions.lenientExpand(), + true, + index + ) + result.addAll(concreteIndices) + } + + if (result.size == 0) { + throw IndexNotFoundException(indices[0]) + } + + return result + } } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 6085bef1d..8dc050dc0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -380,8 +380,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") } + assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size) } fun `test execute monitor with new index added after first execution that generates alerts and findings`() { @@ -410,14 +410,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { var findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue( - "Findings saved for test monitor expected 1 instead of ${findings[0].relatedDocIds}", - findings[0].relatedDocIds.contains("1") - ) - assertTrue( - "Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}", - findings[1].relatedDocIds.contains("5") - ) + + var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") } + assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size) // clear previous findings and alerts deleteIndex(ALL_FINDING_INDEX_PATTERN) @@ -445,18 +440,11 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 3, findings.size) - assertTrue( - "Findings saved for test monitor expected 14 instead of ${findings[0].relatedDocIds}", - findings[0].relatedDocIds.contains("14") - ) - assertTrue( - "Findings saved for test monitor expected 51 instead of ${findings[1].relatedDocIds}", - findings[1].relatedDocIds.contains("51") - ) - assertTrue( - "Findings saved for test monitor expected 10 instead of ${findings[2].relatedDocIds}", - findings[2].relatedDocIds.contains("10") - ) + + foundFindings = findings.filter { + it.relatedDocIds.contains("14") || it.relatedDocIds.contains("51") || it.relatedDocIds.contains("10") + } + assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size) } fun `test document-level monitor when alias only has write index with 0 docs`() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 5fb3508bd..aa8d50fab 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -8,6 +8,7 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest @@ -506,6 +507,178 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertTrue(indices.isNotEmpty()) } + fun `test execute monitor with multiple indices in input success`() { + + val testSourceIndex1 = "test_source_index1" + val testSourceIndex2 = "test_source_index2" + + createIndex(testSourceIndex1, Settings.EMPTY) + createIndex(testSourceIndex2, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1, testSourceIndex2), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern) + ) + val monitorResponse = createMonitor(monitor) + client().admin().indices().refresh(RefreshRequest("*")) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + indexDoc(testSourceIndex1, "1", testDoc) + indexDoc(testSourceIndex2, "1", testDoc) + + val id = monitorResponse.id + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + + var findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 2, findings.size) + var foundFindings = findings.filter { it.relatedDocIds.contains("1") } + assertEquals("Didn't find 2 findings", 2, foundFindings.size) + + indexDoc(testSourceIndex1, "2", testDoc) + indexDoc(testSourceIndex2, "2", testDoc) + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 4, findings.size) + foundFindings = findings.filter { it.relatedDocIds.contains("2") } + assertEquals("Didn't find 2 findings", 2, foundFindings.size) + + val indices = getAllIndicesFromPattern("custom_findings_index*") + Assert.assertTrue(indices.isNotEmpty()) + } + + fun `test execute monitor with multiple indices in input first index gets deleted`() { + // Index #1 does not exist + val testSourceIndex1 = "test_source_index1" + val testSourceIndex2 = "test_source_index2" + + createIndex(testSourceIndex1, Settings.EMPTY) + createIndex(testSourceIndex2, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1, testSourceIndex2), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern) + ) + val monitorResponse = createMonitor(monitor) + client().admin().indices().refresh(RefreshRequest("*")) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + indexDoc(testSourceIndex2, "1", testDoc) + + client().admin().indices().delete(DeleteIndexRequest(testSourceIndex1)).get() + + val id = monitorResponse.id + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + + var findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 1, findings.size) + var foundFindings = findings.filter { it.relatedDocIds.contains("1") } + assertEquals("Didn't find 2 findings", 1, foundFindings.size) + + indexDoc(testSourceIndex2, "2", testDoc) + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 2, findings.size) + foundFindings = findings.filter { it.relatedDocIds.contains("2") } + assertEquals("Didn't find 2 findings", 1, foundFindings.size) + + val indices = getAllIndicesFromPattern("custom_findings_index*") + Assert.assertTrue(indices.isNotEmpty()) + } + + fun `test execute monitor with multiple indices in input second index gets deleted`() { + // Second index does not exist + val testSourceIndex1 = "test_source_index1" + val testSourceIndex2 = "test_source_index2" + + createIndex(testSourceIndex1, Settings.EMPTY) + createIndex(testSourceIndex2, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1, testSourceIndex2), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern) + ) + val monitorResponse = createMonitor(monitor) + client().admin().indices().refresh(RefreshRequest("*")) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + indexDoc(testSourceIndex1, "1", testDoc) + + client().admin().indices().delete(DeleteIndexRequest(testSourceIndex2)).get() + + val id = monitorResponse.id + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + + var findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 1, findings.size) + var foundFindings = findings.filter { it.relatedDocIds.contains("1") } + assertEquals("Didn't find 2 findings", 1, foundFindings.size) + + indexDoc(testSourceIndex1, "2", testDoc) + + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + findings = searchFindings(id, "custom_findings_index*", true) + assertEquals("Findings saved for test monitor", 2, findings.size) + foundFindings = findings.filter { it.relatedDocIds.contains("2") } + assertEquals("Didn't find 2 findings", 1, foundFindings.size) + + val indices = getAllIndicesFromPattern("custom_findings_index*") + Assert.assertTrue(indices.isNotEmpty()) + } + fun `test execute pre-existing monitor and update`() { val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings()) .settings(Settings.builder().put("index.hidden", true).build()) @@ -636,6 +809,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1) } + fun `test execute monitor with empty source index`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + val monitorId = monitorResponse.id + var executeMonitorResponse = executeMonitor(monitor, monitorId, false) + + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + + refreshIndex(customFindingsIndex) + + var findings = searchFindings(monitorId, customFindingsIndex) + assertEquals("Findings saved for test monitor", 0, findings.size) + + indexDoc(index, "1", testDoc) + + executeMonitor(monitor, monitorId, false) + + refreshIndex(customFindingsIndex) + + findings = searchFindings(monitorId, customFindingsIndex) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + } + fun `test execute GetFindingsAction with monitorId param`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))