Skip to content

Commit

Permalink
optimize execution of workflow consisting of bucket-level followed by…
Browse files Browse the repository at this point in the history
… doc-level monitors

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Nov 13, 2024
1 parent fbd3bf0 commit bc90d8f
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,33 @@ class TransportDocLevelMonitorFanOutAction
createFindings(monitor, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
}
} else if (monitor.ignoreFindingsAndAlerts == true) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTriggerIgnoringFindingsAndAlerts(
monitorResult,
it as DocumentLevelTrigger,
monitor,
queryToDocIds,
dryrun,
executionId,
workflowRunContext
)
}
}
}

Expand Down Expand Up @@ -349,6 +363,50 @@ class TransportDocLevelMonitorFanOutAction
}
}

private suspend fun runForEachDocTriggerIgnoringFindingsAndAlerts(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
monitor: Monitor,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
executionId: String,
workflowRunContext: WorkflowRunContext?
): DocumentLevelTriggerRunResult {
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
if (triggerResult.triggeredDocs.isNotEmpty()) {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val alert = alertService.composeDocLevelAlert(
listOf(),
triggerResult.triggeredDocs,
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError(),
executionId = executionId,
workflorwRunContext = workflowRunContext
)
for (action in trigger.actions) {
this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun)
}

if (!dryrun && monitor.id != Monitor.NO_ID) {
val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap())
val actionExecutionResults = actionResults.values.map { actionRunResult ->
ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0)
}
val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults)

retryPolicy.let {
alertService.saveAlerts(
monitor.dataSources,
listOf(updatedAlert),
it,
routingId = monitor.id
)
}
}
}
return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error)
}

private suspend fun runForEachDocTrigger(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
Expand Down Expand Up @@ -512,7 +570,7 @@ class TransportDocLevelMonitorFanOutAction
.string()
log.debug("Findings: $findingStr")

if (shouldCreateFinding) {
if (shouldCreateFinding and (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false)) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand All @@ -524,13 +582,15 @@ class TransportDocLevelMonitorFanOutAction
bulkIndexFindings(monitor, indexRequests)
}

try {
findings.forEach { finding ->
publishFinding(monitor, finding)
if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
try {
findings.forEach { finding ->
publishFinding(monitor, finding)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
this.findingsToTriggeredQueries += findingsToTriggeredQueries

Expand Down Expand Up @@ -688,6 +748,7 @@ class TransportDocLevelMonitorFanOutAction
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitor,
indexExecutionCtx.concreteIndexName,
shard,
from,
Expand Down Expand Up @@ -870,6 +931,7 @@ class TransportDocLevelMonitorFanOutAction
* This method hence fetches only docs from shard which haven't been queried before
*/
private suspend fun searchShard(
monitor: Monitor,
index: String,
shard: String,
prevSeqNo: Long?,
Expand All @@ -883,8 +945,16 @@ class TransportDocLevelMonitorFanOutAction
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
} else if (monitor.ignoreFindingsAndAlerts == true) {
val docIdsParam = mutableListOf<String>()
if (docIds != null) {
docIdsParam.addAll(docIds)
}
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam))
}

val request: SearchRequest = SearchRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6233,4 +6233,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
)
)
}

fun `test execute workflow with custom alerts and finding index when bucket monitor is used in chained finding of ignored doc monitor`() {
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field_1").field("test_field_1")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
// Bucket level monitor will reduce the size of matched doc ids on those that belong
// to a bucket that contains more than 1 document after term grouping
val triggerScript = """
params.docCount > 1
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null,
)
)
val bucketCustomAlertsIndex = "custom_alerts_index"
val bucketCustomFindingsIndex = "custom_findings_index"
val bucketCustomFindingsIndexPattern = "custom_findings_index-1"

val bucketLevelMonitorResponse = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger),
dataSources = DataSources(
findingsEnabled = true,
alertsIndex = bucketCustomAlertsIndex,
findingsIndex = bucketCustomFindingsIndex,
findingsIndexPattern = bucketCustomFindingsIndexPattern
)
)
)!!

val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1", fields = listOf())
val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2", fields = listOf())
val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3))
val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val docCustomAlertsIndex = "custom_alerts_index"
val docCustomFindingsIndex = "custom_findings_index"
val docCustomFindingsIndexPattern = "custom_findings_index-1"
var docLevelMonitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(docTrigger),
dataSources = DataSources(
alertsIndex = docCustomAlertsIndex,
findingsIndex = docCustomFindingsIndex,
findingsIndexPattern = docCustomFindingsIndexPattern
),
ignoreFindingsAndAlerts = true
)

val docLevelMonitorResponse = createMonitor(docLevelMonitor)!!
// 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor)
var workflow = randomWorkflow(
monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id),
enabled = false,
auditDelegateMonitorAlerts = false
)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
assertNotNull(workflowById)

// Creates 5 documents
insertSampleTimeSerializedData(
index,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2",
"test_value_2",
"test_value_3"
)
)

val workflowId = workflowResponse.id
// 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4)
// 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth)
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
assertNotNull(executeWorkflowResponse)

for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) {
if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) {
val searchResult = monitorRunResults.inputResults.results.first()

@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")
?.get("buckets") as List<kotlin.collections.Map<String, Any>>
assertEquals("Incorrect search result", 3, buckets.size)

val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2, workflowId)
assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2)
assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4"))
} else {
assertEquals(1, monitorRunResults.inputResults.results.size)
val values = monitorRunResults.triggerResults.values
assertEquals(1, values.size)

val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 1, workflowId)
assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 1)
assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 0, 0, listOf("1", "2", "3", "4"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,14 @@ fun randomDocumentLevelMonitor(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false,
dataSources: DataSources,
ignoreFindingsAndAlerts: Boolean? = false,
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources,
ignoreFindingsAndAlerts = ignoreFindingsAndAlerts, owner = owner
)
}

Expand Down

0 comments on commit bc90d8f

Please sign in to comment.