Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/composite workflow execution v1 #1

Open
wants to merge 18 commits into
base: feature/composite-workflow-transport-crud-execution
Choose a base branch
from

Conversation

stevanbz
Copy link
Owner

Issue #, if available:

Description of changes:

  • Created WorkflowRunner logic that iterates through the list of the monitors and sequentially executes monitor depending of the type
  • Created logic for getting the chained monitor finding doc ids (two steps) -> 1. getting the finding docIds per execution (using the workflowRunContext) 2. Based on the docIds used in the first step relevant documents are determined for the currently processed monitor (Relevant classes: WorkflowService, CompositeWorkflowRunner, WorkflowRunContext - which is instantiated every time run happens)
  • Created TransportExecuteWorkflowAction and RestExecuteWorkflowAction
  • Added integration tests that are testing the workflow execution. WorkflowRunnerIT.test execute workflow with custom alerts and finding index with doc level and bucket level delegates can be ignored because in AlertingSingleNode test cases it's very hard to create bucket level monitors since this test suite is mocking ScriptService which is responsible for loading Scripts

CheckList:
[ ] Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@eirsep
Copy link

eirsep commented Feb 20, 2023

can you look into adding this painless script module at plugin load test
there will be some method you can override and register the necessary plugin

Let's look into how we can verify composite monitors containing bucket level monitors

…nitor index is not initialized yet. Added workflow crud test cases

Signed-off-by: Stevan Buzejic <[email protected]>
Copy link

@eirsep eirsep left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, Stevan
have reviewed 50% of the PR
will review more while you can address the comments

@@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's discuss offline about cluster/node level settings for composite workflows

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sounds good.

@@ -59,7 +62,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
dryrun: Boolean,
workflowExecutionContext: WorkflowRunContext?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: name the variable same as the type

@@ -389,10 +396,22 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))

if (workflowRunContext != null && !workflowRunContext.indexToDocIds.isNullOrEmpty()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we applying this logic here and not in InputService where the actual search query is being executed ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right. This logic can be removed from here - I forgot to remove it once I added in input service. Tnx and good catch!

@@ -125,6 +127,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

// If monitor execution is triggered from a workflow
val indexToRelatedDocIdsMap = workflowRunContext?.indexToDocIds
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we intialize this just before its usage instead of here?

@@ -105,6 +122,28 @@ class InputService(
}
}

private fun updateInputQueryWithFindingDocIds(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments/javadocs to explain what we intend to do wherever we are using chained findings filtering


// Rewrite query to consider the doc ids per given index
if (chainedFindingExist(indexToDocIds)) {
val updatedSourceQuery = updateInputQueryWithFindingDocIds(input.query.query(), indexToDocIds!!)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null check for query required?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we changing at input query
add a filter after search query is constructed

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null check for query required?

You are right. Adding the null check.

Copy link
Owner Author

@stevanbz stevanbz Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we changing at input query add a filter after search query is constructed

Since rewrittenQuery.query() returns QueryBuilder()! (which can be null) we must do a cast to a BoolQueryBuilder (I guess) which then later we would need to set again to a rewrittenQuery.query.
You can see here that later on query is transformed in a String so it wouldn't be so straight forward to add a filter.
I don't have any more idea how to do this in elegant way (maybe lacking domain knowledge around the OpenSearch classes I can use for this purpose)- if you can give me a hint or a code snippet how I can do, it would be good. Tnx!

@@ -105,6 +122,28 @@ class InputService(
}
}

private fun updateInputQueryWithFindingDocIds(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a common methods used in all the monitor types

Copy link
Owner Author

@stevanbz stevanbz Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same - but then I saw that the search query is executed on a different way depending of the monitor type.
Ie. here you can see how the doc level monitor is getting the matching docs. So, for example, doc level monitor iterates through the list of indices and getting the documents index by index. That's why I adjusted getting the matched docIds on doc level monitor to be aligned with existing logic. Check it out here

val xContentRegistry: NamedXContentRegistry,
) {

suspend fun getFindingDocIdsPerMonitorExecution(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getFindingDocIdsByExecutionId*

.seqNoAndPrimaryTerm(true)
)
.indices(chainedMonitor.dataSources.findingsIndex)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle indexNotFound

return buildMonitors(searchResponse)
}

private fun buildMonitors(response: SearchResponse): List<Monitor> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this function be called parseMonitors

return monitors
}

suspend fun getDocIdsPerFindingIndex(monitorId: String, workflowExecutionId: String): Map<String, List<String>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function will be removed since it's not used at all.

… consider the workflow execution id

Added worfklow service used for retrieving monitors and their findings. Added business logic for considering the chained monitors

Signed-off-by: Stevan Buzejic <[email protected]>
Signed-off-by: Stevan Buzejic <[email protected]>
…when loading the cluster

Signed-off-by: Stevan Buzejic <[email protected]>

class ExecuteWorkflowRequest : ActionRequest {
val dryrun: Boolean
val requestEnd: TimeValue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is requestEnd?

Copy link
Owner Author

@stevanbz stevanbz Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy-paste of ExecuteMonitorRequest. Used in CompositeWorkflowRunner - and passed to concrete monitor runner - ie in bucketLevelMonitors used for defining the search params when creating findings. Check it out here

import org.opensearch.commons.alerting.model.Workflow
import java.io.IOException

class ExecuteWorkflowRequest : ActionRequest {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs for field

)

override fun validate(): ActionRequestValidationException? {
return null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validations?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added check. Tnx and good point


class ExecuteWorkflowResponse : ActionResponse, ToXContentObject {

val workflowRunResult: List<MonitorRunResult<*>>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should store other fields like workflow execution start, and end time, status=failed, successful

Copy link
Owner Author

@stevanbz stevanbz Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will add those fields and appropriate logic around them

return listOf()
}

override fun replacedRoutes(): MutableList<RestHandler.ReplacedRoute> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need replacedRoutes. we are not replacing routes. this would be a new API

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing class completely. Sorry

TODO("Not yet implemented")
}
): List<MonitorRunResult<*>> {
val workflowExecutionId = UUID.randomUUID().toString()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make this execution id more deterministic..
workflowId+timestamp

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like:
val workflowExecutionId = UUID.randomUUID().toString() + LocalDateTime.now()
What do you think?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to something like:
val executionId = workflow.id.plus(LocalDateTime.now()).plus(UUID.randomUUID().toString())

): MonitorRunResult<*> {
TODO("Not yet implemented")
}
): List<MonitorRunResult<*>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return workflowRunResult which should contain list of monitorRunResult

return indexToRelatedDocIdsMap
}

suspend fun searchMonitors(monitors: List<String>, size: Int, owner: String?): List<Monitor> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to getMonitorsById

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is owner field used?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No will remove it. Good catch


val delegates = (workflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
// Fetch monitors by ids
val monitors = monitorCtx.workflowService!!.searchMonitors(delegates.map { it.monitorId }, delegates.size, workflow.owner)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need owner field?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't. Removing

// Validate the monitors size
if (delegates.size != monitors.size) {
val diffMonitorIds = delegates.map { it.monitorId }.minus(monitors.map { it.id }.toSet()).joinToString()
throw IllegalStateException("Delegate monitors don't exist $diffMonitorIds")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz also log workflow id in the message

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Also will add a logs on the beginning and end of workflow execution

…esponse class

Code adjusted according to comments

Signed-off-by: Stevan Buzejic <[email protected]>
@stevanbz stevanbz force-pushed the feature/composite-workflow-execution-v1 branch from d94c257 to a1e0408 Compare February 27, 2023 22:48
var indexToDocIds = mapOf<String, List<String>>()
var delegateMonitor: Monitor
delegateMonitor = monitorsById[delegate.monitorId]
?: throw IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap with alerting exception

* @param chainedMonitor Monitor that is previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle indexNotFound and return empty

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking and let me elaborate a little bit my thinking and proposed solution:
Let's catch all the exceptions that can be raised, and wrap them up in AlertingException (check it out here). The caller function - the function in CompositeWorkflowRunner (here) will do a check and return empty workflow run result. What do you think?

?: throw IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id")
if (delegate.chainedFindings != null) {
val chainedMonitor = monitorsById[delegate.chainedFindings!!.monitorId]
?: throw IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap with alerting exception

dryRun,
workflowRunContext
)
} else {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: use else if for query level and throw unsupported exception

Copy link
Owner Author

@stevanbz stevanbz Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I also wrap into alerting exception or? Ie. something like this:
Something like this:
else if(delegateMonitor.isQueryLevelMonitor()){ QueryLevelMonitorRunner.runMonitor( delegateMonitor, monitorCtx, periodStart, periodEnd, dryRun, workflowRunContext ) } else { throw AlertingException.wrap( IllegalStateException("Unsupported monitor type") ) }

data class WorkflowRunContext(
val chainedMonitorId: String?,
val workflowExecutionId: String,
val indexToDocIds: Map<String, List<String>>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indexToDocIds is not a good variable name. Someone reading the code would not understand that this is the input source.

Copy link
Owner Author

@stevanbz stevanbz Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about "matchingDocIdsPerIndex"?

@eirsep
Copy link

eirsep commented Mar 8, 2023

Let's have latestRunTime and latestExecutionId in workflow object or workflow metadata object.

…dation if the query monitor is part of the workflow chain

Signed-off-by: Stevan Buzejic <[email protected]>
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only for legacy APIs. This is a new API, so we should not have this

stevanbz added 2 commits March 9, 2023 18:47
Signed-off-by: Stevan Buzejic <[email protected]>
… checking workflow metadata. Changed flow of workflow execution

Signed-off-by: Stevan Buzejic <[email protected]>
@stevanbz stevanbz force-pushed the feature/composite-workflow-execution-v1 branch from 20de168 to 8e0d28d Compare March 9, 2023 21:17
@stevanbz stevanbz changed the base branch from feature/composite-workflow-v1 to feature/composite-workflow-transport-crud-execution March 9, 2023 22:41
@stevanbz
Copy link
Owner Author

can you look into adding this painless script module at plugin load test there will be some method you can override and register the necessary plugin

Let's look into how we can verify composite monitors containing bucket level monitors

@stevanbz stevanbz closed this Mar 13, 2023
@stevanbz stevanbz reopened this Mar 13, 2023
…hat verify that workflow metadata is not created

Signed-off-by: Stevan Buzejic <[email protected]>
…he monitors once the workflow is updated

Signed-off-by: Stevan Buzejic <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants