Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/composite workflow execution v1 #1

Open
wants to merge 18 commits into
base: feature/composite-workflow-transport-crud-execution
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
56bba0d
Added integrations tests for checking workflow creation and update sc…
stevanbz Jan 27, 2023
e0af305
Added transport layer for getting and deleting the workflow
stevanbz Jan 31, 2023
feebf0e
Updated getting and deleting the workflow in order to check if the mo…
stevanbz Feb 23, 2023
22eb900
When deleting the monitor, added a check if the monitor is part of th…
stevanbz Feb 27, 2023
d6269ab
Added transport workflow execution layer. Adjusted monitor runners to…
stevanbz Feb 13, 2023
c2588a0
Removed unused classes
stevanbz Feb 13, 2023
ad01b70
Added rest action for executing the workflow
stevanbz Feb 13, 2023
5190726
Added integration tests for workflow execution. Added script modules …
stevanbz Feb 23, 2023
a1e0408
Added workflow execution run result and refactored ExecutionWorkflowR…
stevanbz Feb 27, 2023
a77d9bb
Added integration tests for workflow execution. PR comments addressed
stevanbz Mar 1, 2023
b6f17a8
Code adjusted to comments. Wrapped exceptions when executing workflow
stevanbz Mar 2, 2023
8ded8b8
Added logic for deleting the workflow underlying monitors. Added vali…
stevanbz Mar 8, 2023
a593d38
Added workflow metadata
stevanbz Mar 9, 2023
8e0d28d
Added mappings for the workflow-metadata. Added integration tests for…
stevanbz Mar 9, 2023
af86c69
Renamed properties. Added workflow metadata dryrun integration test t…
stevanbz Mar 13, 2023
4dd13ed
Added workflow integration test for verifying changing the order of t…
stevanbz Mar 13, 2023
a14dfea
Renamed methods for generating the workflows
stevanbz Mar 13, 2023
486c5ab
Added test when updating the non-existing workflow
stevanbz Mar 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Code adjusted to comments. Wrapped exceptions when executing workflow
Signed-off-by: Stevan Buzejic <[email protected]>
stevanbz committed Mar 2, 2023
commit b6f17a8bc07a31cc0d52cd7124e70bad08a124a2
Original file line number Diff line number Diff line change
@@ -157,7 +157,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

// If monitor execution is triggered from a workflow
val indexToRelatedDocIdsMap = workflowRunContext?.indexToDocIds
val indexToRelatedDocIdsMap = workflowRunContext?.matchingDocIdsPerIndex

val matchingDocs = getMatchingDocs(
monitor,
Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ class InputService(
val aggTriggerAfterKey: MutableMap<String, TriggerAfterKey> = mutableMapOf()

// If monitor execution is triggered from a workflow
val indexToDocIds = workflowRunContext?.indexToDocIds
val indexToDocIds = workflowRunContext?.matchingDocIdsPerIndex

// TODO: If/when multiple input queries are supported for Bucket-Level Monitor execution, aggTriggerAfterKeys will
// need to be updated to account for it
Original file line number Diff line number Diff line change
@@ -54,6 +54,8 @@ fun Destination.isTestAction(): Boolean = this.type == DestinationType.TEST_ACTI

fun Monitor.isDocLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR

fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.QUERY_LEVEL_MONITOR

/**
* Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used
* as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values.
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.isDocLevelMonitor
import org.opensearch.alerting.util.isQueryLevelMonitor
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.Delegate
import org.opensearch.commons.alerting.model.Monitor
@@ -49,10 +50,14 @@ object CompositeWorkflowRunner : WorkflowRunner() {
var indexToDocIds = mapOf<String, List<String>>()
var delegateMonitor: Monitor
delegateMonitor = monitorsById[delegate.monitorId]
?: throw IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id")
?: throw AlertingException.wrap(
IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
if (delegate.chainedFindings != null) {
val chainedMonitor = monitorsById[delegate.chainedFindings!!.monitorId]
?: throw IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
?: throw AlertingException.wrap(
IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
)
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, workflowExecutionId)
}

@@ -76,7 +81,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
dryRun,
workflowRunContext
)
} else {
} else if (delegateMonitor.isQueryLevelMonitor()) {
QueryLevelMonitorRunner.runMonitor(
delegateMonitor,
monitorCtx,
@@ -85,6 +90,10 @@ object CompositeWorkflowRunner : WorkflowRunner() {
dryRun,
workflowRunContext
)
} else {
throw AlertingException.wrap(
IllegalStateException("Unsupported monitor type")
)
}
resultList.add(runResult)
}
Original file line number Diff line number Diff line change
@@ -8,5 +8,5 @@ package org.opensearch.alerting.workflow
data class WorkflowRunContext(
val chainedMonitorId: String?,
val workflowExecutionId: String,
val indexToDocIds: Map<String, List<String>>
val matchingDocIdsPerIndex: Map<String, List<String>>
)
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ import java.util.concurrent.ExecutionException
class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {

fun `test execute workflow with custom alerts and finding index with doc level delegates`() {
val docQuery1 = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex1 = "custom_alerts_index"
@@ -83,7 +83,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16644,
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "1", testDoc1)

@@ -93,7 +93,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16645,
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "2", testDoc2)

@@ -103,7 +103,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16645,
"test_strict_date_time" : "$testTime",
"test_field" : "us-east-1"
"test_field_1" : "us-east-1"
}"""
indexDoc(index, "3", testDoc3)

@@ -131,7 +131,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field")
TermsValuesSourceBuilder("test_field_1").field("test_field_1")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
@@ -168,9 +168,9 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
)
)!!

val docQuery1 = DocLevelQuery(query = "test_field:\"test_value_2\"", name = "1")
val docQuery2 = DocLevelQuery(query = "test_field:\"test_value_1\"", name = "2")
val docQuery3 = DocLevelQuery(query = "test_field:\"test_value_3\"", name = "3")
val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1")
val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2")
val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3))
val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val docCustomAlertsIndex = "custom_alerts_index"
@@ -239,8 +239,8 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
}

fun `test execute workflow with custom alerts and finding index with bucket level and doc level delegates when doc level delegate is used in chained finding`() {
val docQuery1 = DocLevelQuery(query = "test_field:\"test_value_2\"", name = "1")
val docQuery2 = DocLevelQuery(query = "test_field:\"test_value_3\"", name = "2")
val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1")
val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "2")

var docLevelMonitor = randomDocumentLevelMonitor(
inputs = listOf(DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2))),
@@ -259,7 +259,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field")
TermsValuesSourceBuilder("test_field_1").field("test_field_1")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
@@ -294,7 +294,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
)!!

var docLevelMonitor1 = randomDocumentLevelMonitor(
// Match the documents with test_field: test_value_3
// Match the documents with test_field_1: test_value_3
inputs = listOf(DocLevelMonitorInput("description", listOf(index), listOf(docQuery2))),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)),
dataSources = DataSources(
Original file line number Diff line number Diff line change
@@ -88,7 +88,7 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() {
val testDoc = """
{
"test_strict_date_time": "$testTime",
"test_field": "$value",
"test_field_1": "$value",
"number": "$i"
}
""".trimIndent()
@@ -112,7 +112,7 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() {
.field("type", "date")
.field("format", "strict_date_time")
.endObject()
.startObject("test_field")
.startObject("test_field_1")
.field("type", "keyword")
.endObject()
.endObject()