From 3495cf83ba38ccd8eb5cf49a189474b5bc76846c Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Nov 2024 23:09:14 +0000 Subject: [PATCH] update code-review comments Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 4 ++- .../org/opensearch/alerting/InputService.kt | 2 +- .../opensearch/alerting/WorkflowService.kt | 9 +++--- .../TransportDocLevelMonitorFanOutAction.kt | 30 +++++++++++++------ .../workflow/CompositeWorkflowRunner.kt | 6 ++-- .../SampleRemoteMonitorRestHandler.java | 3 ++ 6 files changed, 36 insertions(+), 18 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index b5b2c4f49..267a1be6d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -138,7 +138,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } // Map of document ids per index when monitor is workflow delegate and has chained findings - val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex?.first + val findingIdsForMatchingDocIds = workflowRunContext?.matchingDocIdsPerIndex?.second val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() @@ -226,6 +227,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { concreteIndices, conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), + findingIdsForMatchingDocIds ) val shards = mutableSetOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index c7eeac833..3c8194ceb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -90,7 +90,7 @@ class InputService( periodStart = periodStart, periodEnd = periodEnd, prevResult = prevResult, - matchingDocIdsPerIndex = matchingDocIdsPerIndex, + matchingDocIdsPerIndex = matchingDocIdsPerIndex?.first, returnSampleDocs = false ) val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt index 32d7971f1..1379a1fe3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt @@ -41,15 +41,16 @@ class WorkflowService( * @param chainedMonitors Monitors that have previously executed * @param workflowExecutionId Execution id of the current workflow */ - suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List, workflowExecutionId: String): Map> { + suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List, workflowExecutionId: String): + Pair>, List> { if (chainedMonitors.isEmpty()) - return emptyMap() + return Pair(emptyMap(), listOf()) val dataSources = chainedMonitors[0].dataSources try { val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil { exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it) } - if (existsResponse.isExists == false) return emptyMap() + if (existsResponse.isExists == false) return Pair(emptyMap(), listOf()) // Search findings index to match id of monitors and workflow execution id val bqb = QueryBuilders.boolQuery() .filter( @@ -83,7 +84,7 @@ class WorkflowService( for (finding in findings) { indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds) } - return indexToRelatedDocIdsMap + return Pair(indexToRelatedDocIdsMap, findings.map { it.id }) } catch (t: Exception) { log.error("Error getting finding doc ids: ${t.message}", t) throw AlertingException.wrap(t) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 6d1ac90cc..590732da4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -297,7 +297,11 @@ class TransportDocLevelMonitorFanOutAction createFindings(monitor, docsToQueries, idQueryMap, true) } } else { - if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + /** + * if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger + * generates a single alert with multiple findings. + */ + if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { monitor.triggers.forEach { triggerResults[it.id] = runForEachDocTrigger( monitorResult, @@ -312,9 +316,9 @@ class TransportDocLevelMonitorFanOutAction workflowRunContext = workflowRunContext ) } - } else if (monitor.ignoreFindingsAndAlerts == true) { + } else if (monitor.shouldPersistFindingsAndAlerts == true) { monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTriggerIgnoringFindingsAndAlerts( + triggerResults[it.id] = runForEachDocTriggerWithoutPersistFindingsAndAlerts( monitorResult, it as DocumentLevelTrigger, monitor, @@ -363,7 +367,10 @@ class TransportDocLevelMonitorFanOutAction } } - private suspend fun runForEachDocTriggerIgnoringFindingsAndAlerts( + /** + * run doc-level triggers ignoring findings and alerts and generating a single alert. + */ + private suspend fun runForEachDocTriggerWithoutPersistFindingsAndAlerts( monitorResult: MonitorRunResult, trigger: DocumentLevelTrigger, monitor: Monitor, @@ -374,9 +381,14 @@ class TransportDocLevelMonitorFanOutAction ): DocumentLevelTriggerRunResult { val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds) if (triggerResult.triggeredDocs.isNotEmpty()) { + val findingIds = if (workflowRunContext?.matchingDocIdsPerIndex?.second != null) { + workflowRunContext.matchingDocIdsPerIndex.second + } else { + listOf() + } val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val alert = alertService.composeDocLevelAlert( - listOf(), + findingIds, triggerResult.triggeredDocs, triggerCtx, monitorResult.alertError() ?: triggerResult.alertError(), @@ -570,7 +582,7 @@ class TransportDocLevelMonitorFanOutAction .string() log.debug("Findings: $findingStr") - if (shouldCreateFinding and (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false)) { + if (shouldCreateFinding and (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false)) { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) @@ -582,7 +594,7 @@ class TransportDocLevelMonitorFanOutAction bulkIndexFindings(monitor, indexRequests) } - if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { try { findings.forEach { finding -> publishFinding(monitor, finding) @@ -945,11 +957,11 @@ class TransportDocLevelMonitorFanOutAction val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) { + if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) { if (!docIds.isNullOrEmpty()) { boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } - } else if (monitor.ignoreFindingsAndAlerts == true) { + } else if (monitor.shouldPersistFindingsAndAlerts == true) { val docIdsParam = mutableListOf() if (docIds != null) { docIdsParam.addAll(docIds) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 366a75e1c..2ca8f978a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -95,7 +95,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { var lastErrorDelegateRun: Exception? = null for (delegate in delegates) { - var indexToDocIds = mapOf>() + var indexToDocIdsWithFindings: Pair>, List>? = Pair(mapOf(), listOf()) var delegateMonitor: Monitor delegateMonitor = monitorsById[delegate.monitorId] ?: throw AlertingException.wrap( @@ -118,7 +118,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { } try { - indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId) + indexToDocIdsWithFindings = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId) } catch (e: Exception) { logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e) return WorkflowRunResult( @@ -131,7 +131,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { workflowId = workflowMetadata.workflowId, workflowMetadataId = workflowMetadata.id, chainedMonitorId = delegate.chainedMonitorFindings?.monitorId, - matchingDocIdsPerIndex = indexToDocIds, + matchingDocIdsPerIndex = indexToDocIdsWithFindings!!, auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true else workflow.auditDelegateMonitorAlerts!! ) diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 9875ef55f..085a8db80 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -95,6 +95,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( @@ -156,6 +157,7 @@ public void onFailure(Exception e) { Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( @@ -240,6 +242,7 @@ public void onFailure(Exception e) { Map.of(), new DataSources(), false, + false, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(