diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 16272fd99..15ffeb57a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -239,7 +239,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() { shards.remove("index") shards.remove("shards_count") - val nodeMap = getNodes(monitorCtx) + /** + * if fanout flag is disabled and force assign all shards to local node + * thus effectively making the fan-out a single node operation. + * This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules + **/ + val localNode = monitorCtx.clusterService!!.localNode() + val nodeMap: Map = if (monitor.fanoutEnabled == true) { + getNodes(monitorCtx) + } else { + logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}") + mapOf(localNode.id to localNode) + } + val nodeShardAssignments = distributeShards( monitorCtx, nodeMap.keys.toList(), diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 769c20ead..a769fc7af 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2750,6 +2750,65 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { deleteDataStream(aliasName) } + fun `test document-level monitor fanout disabled approach when aliases contain indices with multiple shards`() { + val aliasName = "test-alias" + createIndexAlias( + aliasName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + "\"index.number_of_shards\": 7" + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES), + fanoutEnabled = false + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(aliasName, "1", testDoc) + indexDoc(aliasName, "2", testDoc) + indexDoc(aliasName, "4", testDoc) + indexDoc(aliasName, "5", testDoc) + indexDoc(aliasName, "6", testDoc) + indexDoc(aliasName, "7", testDoc) + OpenSearchTestCase.waitUntil( + { searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES + ) + + rolloverDatastream(aliasName) + indexDoc(aliasName, "11", testDoc) + indexDoc(aliasName, "12", testDoc) + indexDoc(aliasName, "14", testDoc) + indexDoc(aliasName, "15", testDoc) + indexDoc(aliasName, "16", testDoc) + indexDoc(aliasName, "17", testDoc) + OpenSearchTestCase.waitUntil( + { searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES + ) + + deleteDataStream(aliasName) + } + fun `test execute monitor generates alerts and findings with renewable locks`() { val testIndex = createTestIndex() val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 2330974f4..77cf6a538 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -227,6 +227,26 @@ fun randomDocumentLevelMonitor( ) } +fun randomDocumentLevelMonitor( + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User? = randomUser(), + inputs: List = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false, + fanoutEnabled: Boolean? = true, +): Monitor { + return Monitor( + name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), + fanoutEnabled = fanoutEnabled + ) +} + fun randomWorkflow( id: String = Workflow.NO_ID, monitorIds: List, diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 085a8db80..f03dbb017 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -96,7 +96,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient new DataSources(), false, false, - "sample-remote-monitor-plugin" + "sample-remote-monitor-plugin", + true ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( Monitor.NO_ID, @@ -158,7 +159,8 @@ public void onFailure(Exception e) { new DataSources(), false, false, - "sample-remote-monitor-plugin" + "sample-remote-monitor-plugin", + true ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( Monitor.NO_ID, @@ -243,7 +245,8 @@ public void onFailure(Exception e) { new DataSources(), false, false, - "sample-remote-monitor-plugin" + "sample-remote-monitor-plugin", + true ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest( Monitor.NO_ID,