Skip to content

Commit

Permalink
[Backport 2.x] Add an _exists_ check to document level monitor queries (
Browse files Browse the repository at this point in the history
#1425) (#1456)

* Add an _exists_ check to document level monitor queries (#1425)

* clean up and add integ tests

Signed-off-by: Joanne Wang <[email protected]>

* refactored out common method and renamed test

Signed-off-by: Joanne Wang <[email protected]>

* remove _exists_ flag

Signed-off-by: Joanne Wang <[email protected]>

---------

Signed-off-by: Joanne Wang <[email protected]>

* fix integ test

Signed-off-by: Joanne Wang <[email protected]>

---------

Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon authored Mar 8, 2024
1 parent d3ede1a commit cb890f0
Show file tree
Hide file tree
Showing 2 changed files with 366 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
var query = it.query
conflictingPaths.forEach { conflictingPath ->
if (query.contains(conflictingPath)) {
query = transformExistsQuery(query, conflictingPath, "<index>", monitorId)
query = query.replace("$conflictingPath:", "${conflictingPath}_<index>_$monitorId:")
filteredConcreteIndices.addAll(conflictingPathToConcreteIndices[conflictingPath]!!)
}
Expand All @@ -418,6 +419,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
var query = it.query
flattenPaths.forEach { fieldPath ->
if (!conflictingPaths.contains(fieldPath.first)) {
query = transformExistsQuery(query, fieldPath.first, sourceIndex, monitorId)
query = query.replace("${fieldPath.first}:", "${fieldPath.first}_${sourceIndex}_$monitorId:")
}
}
Expand Down Expand Up @@ -448,6 +450,26 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
}

/**
* Transforms the query if it includes an _exists_ clause to append the index name and the monitor id to the field value
*/
private fun transformExistsQuery(query: String, conflictingPath: String, indexName: String, monitorId: String): String {
return query
.replace("_exists_: ", "_exists_:") // remove space to read exists query as one string
.split("\\s+".toRegex())
.joinToString(separator = " ") { segment ->
if (segment.contains("_exists_:")) {
val trimSegement = segment.trim { it == '(' || it == ')' } // remove any delimiters from ends
val (_, value) = trimSegement.split(":", limit = 2) // split into key and value
val newString = if (value == conflictingPath)
segment.replace(conflictingPath, "${conflictingPath}_${indexName}_$monitorId") else segment
newString
} else {
segment
}
}
}

private suspend fun updateQueryIndexMappings(
monitor: Monitor,
monitorMetadata: MonitorMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1997,6 +1997,350 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals(1, output.objectMap("trigger_results").values.size)
}

fun `test execute monitor generates alerts and findings with NOT EQUALS query and EXISTS query`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val query = "NOT test_field: \"us-east-1\" AND _exists_: test_field"
val docQuery = DocLevelQuery(query = query, name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex")))

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5"))
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5"))
}

fun `test document-level monitor when index alias contain docs that do match a NOT EQUALS query and EXISTS query`() {
val aliasName = "test-alias"
createIndexAlias(
aliasName,
"""
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent()
)

val docQuery = DocLevelQuery(query = "NOT test_field:\"us-east-1\" AND _exists_: test_field", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf("$aliasName"), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)
)

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"@timestamp": "$testTime",
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
indexDoc(aliasName, "1", testDoc)
var response = executeMonitor(monitor.id)
var output = entityAsMap(response)
var searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
var matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)

rolloverDatastream(aliasName)
indexDoc(aliasName, "2", testDoc)
response = executeMonitor(monitor.id)
output = entityAsMap(response)
searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)

deleteIndexAlias(aliasName)
}

fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS and EXISTS query operator`() {
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
val testQueryName = "wildcard-test-query"
val testIndex = createTestIndex("${testIndexPrefix}1")
val testIndex2 = createTestIndex("${testIndexPrefix}2")

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val query = "NOT test_field:\"us-west-1\" AND _exists_: test_field"
val docQuery = DocLevelQuery(query = query, name = testQueryName, fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]"))
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex2, "5", testDoc)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex2")))

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 2, findings.size)
val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
}

fun `test execute monitor with indices having fields with same name different field mappings in multiple indices with NOT EQUALS`() {
val testIndex = createTestIndex(
"test1",
""""properties": {
"source": {
"properties": {
"device": {
"properties": {
"hwd": {
"properties": {
"id": {
"type":"text",
"analyzer":"whitespace"
}
}
}
}
}
}
},
"test_field" : {
"type":"text"
}
}
""".trimIndent()
)

val testIndex2 = createTestIndex(
"test2",
""""properties": {
"test_field" : {
"type":"keyword"
}
}
""".trimIndent()
)

val testIndex4 = createTestIndex(
"test4",
""""properties": {
"source": {
"properties": {
"device": {
"properties": {
"hwd": {
"properties": {
"id": {
"type":"text"
}
}
}
}
}
}
},
"test_field" : {
"type":"text"
}
}
""".trimIndent()
)

val testDoc1 = """{
"source" : {"device" : {"hwd" : {"id" : "123456"}} },
"nested_field": { "test1": "some text" }
}"""
val testDoc2 = """{
"nested_field": { "test1": "some text" },
"test_field": "123456"
}"""

val docQuery1 = DocLevelQuery(
query = "NOT test_field:\"12345\" AND _exists_: test_field",
name = "4",
fields = listOf()
)
val docQuery2 = DocLevelQuery(
query = "NOT source.device.hwd.id:\"12345\" AND _exists_: source.device.hwd.id",
name = "5",
fields = listOf()
)

val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex4, "1", testDoc1)
indexDoc(testIndex2, "1", testDoc2)
indexDoc(testIndex, "1", testDoc1)
indexDoc(testIndex, "2", testDoc2)

executeMonitor(monitor.id)

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 4, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 4, findings.size)

val request = """{
"size": 0,
"query": {
"match_all": {}
}
}"""
val httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())

val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals(5L, it.value) }
}

fun `test execute monitor with indices having fields with same name but different field mappings with NOT EQUALS`() {
val testIndex = createTestIndex(
"test1",
""""properties": {
"source": {
"properties": {
"id": {
"type":"text",
"analyzer":"whitespace"
}
}
},
"test_field" : {
"type":"text",
"analyzer":"whitespace"
}
}
""".trimIndent()
)

val testIndex2 = createTestIndex(
"test2",
""""properties": {
"source": {
"properties": {
"id": {
"type":"text"
}
}
},
"test_field" : {
"type":"text"
}
}
""".trimIndent()
)
val testDoc = """{
"source" : {"id" : "12345" },
"nested_field": { "test1": "some text" },
"test_field": "12345"
}"""

val docQuery = DocLevelQuery(
query = "(NOT test_field:\"123456\" AND _exists_:test_field) AND source.id:\"12345\"",
name = "5",
fields = listOf()
)
val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex2, "1", testDoc)

executeMonitor(monitor.id)

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 2, findings.size)

// as mappings of source.id & test_field are different so, both of them expands
val expectedQueries = listOf(
"(NOT test_field_test2_${monitor.id}:\"123456\" AND _exists_:test_field_test2_${monitor.id}) " +
"AND source.id_test2_${monitor.id}:\"12345\"",
"(NOT test_field_test1_${monitor.id}:\"123456\" AND _exists_:test_field_test1_${monitor.id}) " +
"AND source.id_test1_${monitor.id}:\"12345\""
)

val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.forEach { hit ->
val query = ((hit.sourceAsMap["query"] as Map<String, Any>)["query_string"] as Map<String, Any>)["query"]
assertTrue(expectedQueries.contains(query))
}
}

@Suppress("UNCHECKED_CAST")
/** helper that returns a field in a json map whose values are all json objects */
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
Expand Down

0 comments on commit cb890f0

Please sign in to comment.