diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 60fe2b684..6d1ac90cc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -297,19 +297,33 @@ class TransportDocLevelMonitorFanOutAction createFindings(monitor, docsToQueries, idQueryMap, true) } } else { - monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTrigger( - monitorResult, - it as DocumentLevelTrigger, - monitor, - idQueryMap, - docsToQueries, - queryToDocIds, - dryrun, - executionId = executionId, - findingIdToDocSource, - workflowRunContext = workflowRunContext - ) + if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTrigger( + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryToDocIds, + dryrun, + executionId = executionId, + findingIdToDocSource, + workflowRunContext = workflowRunContext + ) + } + } else if (monitor.ignoreFindingsAndAlerts == true) { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTriggerIgnoringFindingsAndAlerts( + monitorResult, + it as DocumentLevelTrigger, + monitor, + queryToDocIds, + dryrun, + executionId, + workflowRunContext + ) + } } } @@ -349,6 +363,50 @@ class TransportDocLevelMonitorFanOutAction } } + private suspend fun runForEachDocTriggerIgnoringFindingsAndAlerts( + monitorResult: MonitorRunResult, + trigger: DocumentLevelTrigger, + monitor: Monitor, + queryToDocIds: Map>, + dryrun: Boolean, + executionId: String, + workflowRunContext: WorkflowRunContext? + ): DocumentLevelTriggerRunResult { + val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds) + if (triggerResult.triggeredDocs.isNotEmpty()) { + val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) + val alert = alertService.composeDocLevelAlert( + listOf(), + triggerResult.triggeredDocs, + triggerCtx, + monitorResult.alertError() ?: triggerResult.alertError(), + executionId = executionId, + workflorwRunContext = workflowRunContext + ) + for (action in trigger.actions) { + this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun) + } + + if (!dryrun && monitor.id != Monitor.NO_ID) { + val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap()) + val actionExecutionResults = actionResults.values.map { actionRunResult -> + ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0) + } + val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults) + + retryPolicy.let { + alertService.saveAlerts( + monitor.dataSources, + listOf(updatedAlert), + it, + routingId = monitor.id + ) + } + } + } + return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error) + } + private suspend fun runForEachDocTrigger( monitorResult: MonitorRunResult, trigger: DocumentLevelTrigger, @@ -512,7 +570,7 @@ class TransportDocLevelMonitorFanOutAction .string() log.debug("Findings: $findingStr") - if (shouldCreateFinding) { + if (shouldCreateFinding and (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false)) { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) @@ -524,13 +582,15 @@ class TransportDocLevelMonitorFanOutAction bulkIndexFindings(monitor, indexRequests) } - try { - findings.forEach { finding -> - publishFinding(monitor, finding) + if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + try { + findings.forEach { finding -> + publishFinding(monitor, finding) + } + } catch (e: Exception) { + // suppress exception + log.error("Optional finding callback failed", e) } - } catch (e: Exception) { - // suppress exception - log.error("Optional finding callback failed", e) } this.findingsToTriggeredQueries += findingsToTriggeredQueries @@ -688,6 +748,7 @@ class TransportDocLevelMonitorFanOutAction var to: Long = Long.MAX_VALUE while (to >= from) { val hits: SearchHits = searchShard( + monitor, indexExecutionCtx.concreteIndexName, shard, from, @@ -870,6 +931,7 @@ class TransportDocLevelMonitorFanOutAction * This method hence fetches only docs from shard which haven't been queried before */ private suspend fun searchShard( + monitor: Monitor, index: String, shard: String, prevSeqNo: Long?, @@ -883,8 +945,16 @@ class TransportDocLevelMonitorFanOutAction val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (!docIds.isNullOrEmpty()) { - boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + if (!docIds.isNullOrEmpty()) { + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + } + } else if (monitor.ignoreFindingsAndAlerts == true) { + val docIdsParam = mutableListOf() + if (docIds != null) { + docIdsParam.addAll(docIds) + } + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam)) } val request: SearchRequest = SearchRequest() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index a790b1736..49c648e3c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -6233,4 +6233,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { ) ) } + + fun `test execute workflow with custom alerts and finding index when bucket monitor is used in chained finding of ignored doc monitor`() { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field_1").field("test_field_1") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + // Bucket level monitor will reduce the size of matched doc ids on those that belong + // to a bucket that contains more than 1 document after term grouping + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null, + ) + ) + val bucketCustomAlertsIndex = "custom_alerts_index" + val bucketCustomFindingsIndex = "custom_findings_index" + val bucketCustomFindingsIndexPattern = "custom_findings_index-1" + + val bucketLevelMonitorResponse = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources( + findingsEnabled = true, + alertsIndex = bucketCustomAlertsIndex, + findingsIndex = bucketCustomFindingsIndex, + findingsIndexPattern = bucketCustomFindingsIndexPattern + ) + ) + )!! + + val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1", fields = listOf()) + val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2", fields = listOf()) + val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3)) + val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val docCustomAlertsIndex = "custom_alerts_index" + val docCustomFindingsIndex = "custom_findings_index" + val docCustomFindingsIndexPattern = "custom_findings_index-1" + var docLevelMonitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(docTrigger), + dataSources = DataSources( + alertsIndex = docCustomAlertsIndex, + findingsIndex = docCustomFindingsIndex, + findingsIndexPattern = docCustomFindingsIndexPattern + ), + ignoreFindingsAndAlerts = true + ) + + val docLevelMonitorResponse = createMonitor(docLevelMonitor)!! + // 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor) + var workflow = randomWorkflow( + monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id), + enabled = false, + auditDelegateMonitorAlerts = false + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + // Creates 5 documents + insertSampleTimeSerializedData( + index, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2", + "test_value_2", + "test_value_3" + ) + ) + + val workflowId = workflowResponse.id + // 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4) + // 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth) + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + assertNotNull(executeWorkflowResponse) + + for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) { + if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) { + val searchResult = monitorRunResults.inputResults.results.first() + + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg") + ?.get("buckets") as List> + assertEquals("Incorrect search result", 3, buckets.size) + + val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2, workflowId) + assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2) + assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4")) + } else { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + + val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 1, workflowId) + assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 1) + assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 0, 0, listOf("1", "2", "3", "4")) + } + } + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index ee4a5ece3..43272107b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -216,12 +216,14 @@ fun randomDocumentLevelMonitor( lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), withMetadata: Boolean = false, dataSources: DataSources, + ignoreFindingsAndAlerts: Boolean? = false, owner: String? = null ): Monitor { return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, + ignoreFindingsAndAlerts = ignoreFindingsAndAlerts, owner = owner ) }