Skip to content

Commit

Permalink
shards assignment to local Node when fanout flag is disabled
Browse files Browse the repository at this point in the history
Signed-off-by: Riya Saxena <[email protected]>
  • Loading branch information
riysaxen-amzn committed Dec 17, 2024
1 parent 3755993 commit 8f37a77
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shards.remove("index")
shards.remove("shards_count")

val nodeMap = getNodes(monitorCtx)
/**
* if fanout flag is disabled and force assign all shards to local node
* thus effectively making the fan-out a single node operation.
* This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules
**/
val localNode = monitorCtx.clusterService!!.localNode()
val nodeMap: Map<String, DiscoveryNode> = if (monitor.fanoutEnabled == true) {
getNodes(monitorCtx)
} else {
logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}")
mapOf(localNode.id to localNode)
}

val nodeShardAssignments = distributeShards(
monitorCtx,
nodeMap.keys.toList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2750,6 +2750,65 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
deleteDataStream(aliasName)
}

fun `test document-level monitor fanout disabled approach when aliases contain indices with multiple shards`() {
val aliasName = "test-alias"
createIndexAlias(
aliasName,
"""
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent(),
"\"index.number_of_shards\": 7"
)

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES),
fanoutEnabled = false
)
)

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"@timestamp": "$testTime",
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
indexDoc(aliasName, "1", testDoc)
indexDoc(aliasName, "2", testDoc)
indexDoc(aliasName, "4", testDoc)
indexDoc(aliasName, "5", testDoc)
indexDoc(aliasName, "6", testDoc)
indexDoc(aliasName, "7", testDoc)
OpenSearchTestCase.waitUntil(
{ searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES
)

rolloverDatastream(aliasName)
indexDoc(aliasName, "11", testDoc)
indexDoc(aliasName, "12", testDoc)
indexDoc(aliasName, "14", testDoc)
indexDoc(aliasName, "15", testDoc)
indexDoc(aliasName, "16", testDoc)
indexDoc(aliasName, "17", testDoc)
OpenSearchTestCase.waitUntil(
{ searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES
)

deleteDataStream(aliasName)
}

fun `test execute monitor generates alerts and findings with renewable locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
20 changes: 20 additions & 0 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,26 @@ fun randomDocumentLevelMonitor(
)
}

fun randomDocumentLevelMonitor(
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
inputs: List<Input> = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())),
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = randomBoolean(),
triggers: List<Trigger> = (1..randomInt(10)).map { randomQueryLevelTrigger() },
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false,
fanoutEnabled: Boolean? = true,
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(),
fanoutEnabled = fanoutEnabled
)
}

fun randomWorkflow(
id: String = Workflow.NO_ID,
monitorIds: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
"sample-remote-monitor-plugin",
true
);
IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest(
Monitor.NO_ID,
Expand Down Expand Up @@ -158,7 +159,8 @@ public void onFailure(Exception e) {
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
"sample-remote-monitor-plugin",
true
);
IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest(
Monitor.NO_ID,
Expand Down Expand Up @@ -243,7 +245,8 @@ public void onFailure(Exception e) {
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
"sample-remote-monitor-plugin",
true
);
IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(
Monitor.NO_ID,
Expand Down

0 comments on commit 8f37a77

Please sign in to comment.