Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/composite workflow execution v1 #1

Open
wants to merge 18 commits into
base: feature/composite-workflow-transport-crud-execution
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
56bba0d
Added integrations tests for checking workflow creation and update sc…
stevanbz Jan 27, 2023
e0af305
Added transport layer for getting and deleting the workflow
stevanbz Jan 31, 2023
feebf0e
Updated getting and deleting the workflow in order to check if the mo…
stevanbz Feb 23, 2023
22eb900
When deleting the monitor, added a check if the monitor is part of th…
stevanbz Feb 27, 2023
d6269ab
Added transport workflow execution layer. Adjusted monitor runners to…
stevanbz Feb 13, 2023
c2588a0
Removed unused classes
stevanbz Feb 13, 2023
ad01b70
Added rest action for executing the workflow
stevanbz Feb 13, 2023
5190726
Added integration tests for workflow execution. Added script modules …
stevanbz Feb 23, 2023
a1e0408
Added workflow execution run result and refactored ExecutionWorkflowR…
stevanbz Feb 27, 2023
a77d9bb
Added integration tests for workflow execution. PR comments addressed
stevanbz Mar 1, 2023
b6f17a8
Code adjusted to comments. Wrapped exceptions when executing workflow
stevanbz Mar 2, 2023
8ded8b8
Added logic for deleting the workflow underlying monitors. Added vali…
stevanbz Mar 8, 2023
a593d38
Added workflow metadata
stevanbz Mar 9, 2023
8e0d28d
Added mappings for the workflow-metadata. Added integration tests for…
stevanbz Mar 9, 2023
af86c69
Renamed properties. Added workflow metadata dryrun integration test t…
stevanbz Mar 13, 2023
4dd13ed
Added workflow integration test for verifying changing the order of t…
stevanbz Mar 13, 2023
a14dfea
Renamed methods for generating the workflows
stevanbz Mar 13, 2023
486c5ab
Added test when updating the non-existing workflow
stevanbz Mar 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated getting and deleting the workflow in order to check if the mo…
…nitor index is not initialized yet. Added workflow crud test cases

Signed-off-by: Stevan Buzejic <[email protected]>
stevanbz committed Feb 23, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit feebf0e3f769eba0b5e849813b10caffc95533cc
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
@@ -18,6 +19,7 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
@@ -36,10 +38,13 @@ import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.authuser.User
import org.opensearch.commons.utils.recreateObject
import org.opensearch.index.IndexNotFoundException
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java)

class TransportDeleteWorkflowAction @Inject constructor(
transportService: TransportService,
val client: Client,
@@ -64,7 +69,7 @@ class TransportDeleteWorkflowAction @Inject constructor(

val user = readUserFromThreadContext(client)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, transformedRequest.workflowId)
.setRefreshPolicy(transformedRequest.refreshPolicy)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)

if (!validateUserBackendRoles(user, actionListener)) {
return
@@ -111,7 +116,16 @@ class TransportDeleteWorkflowAction @Inject constructor(
)
}
} catch (t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
if (t is IndexNotFoundException) {
actionListener.onFailure(
OpenSearchStatusException(
"Workflow not found.",
RestStatus.NOT_FOUND
)
)
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
}

@@ -122,7 +136,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
if (getResponse.isExists == false) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Workflow with $workflowId is not found", RestStatus.NOT_FOUND)
OpenSearchStatusException("Workflow not found.", RestStatus.NOT_FOUND)
)
)
}
@@ -134,6 +148,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
}

private suspend fun deleteWorkflow(workflow: Workflow): DeleteResponse {
log.debug("Deleting the workflow with id ${deleteRequest.id()}")
return client.suspendUntil { delete(deleteRequest, it) }
}

Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import org.opensearch.commons.alerting.action.GetWorkflowRequest
import org.opensearch.commons.alerting.action.GetWorkflowResponse
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.index.IndexNotFoundException
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
@@ -117,7 +118,16 @@ class TransportGetWorkflowAction @Inject constructor(
}

override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
if (t is IndexNotFoundException) {
actionListener.onFailure(
OpenSearchStatusException(
"Workflow not found",
RestStatus.NOT_FOUND
)
)
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
}
)
Original file line number Diff line number Diff line change
@@ -463,6 +463,12 @@ class TransportIndexCompositeWorkflowAction @Inject constructor(
}

suspend fun validateRequest(request: IndexWorkflowRequest) {
if (request.workflow.inputs.isEmpty())
throw AlertingException.wrap(IllegalArgumentException("Input list can not be empty."))

if (request.workflow.inputs[0] !is CompositeInput)
throw AlertingException.wrap(IllegalArgumentException("When creating a workflow input must be CompositeInput"))

val compositeInput = request.workflow.inputs[0] as CompositeInput
val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList())

Original file line number Diff line number Diff line change
@@ -224,6 +224,7 @@ fun randomDocumentLevelMonitor(
}

fun randomWorkflowMonitor(
id: String = Workflow.NO_ID,
monitorIds: List<String>,
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
@@ -241,6 +242,7 @@ fun randomWorkflowMonitor(
}

return Workflow(
id = id,
name = name,
enabled = enabled,
schedule = schedule,
@@ -253,6 +255,7 @@ fun randomWorkflowMonitor(
}

fun randomWorkflowMonitorWithDelegates(
id: String = Workflow.NO_ID,
delegates: List<Delegate>,
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
@@ -262,6 +265,7 @@ fun randomWorkflowMonitorWithDelegates(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
): Workflow {
return Workflow(
id = id,
name = name,
enabled = enabled,
schedule = schedule,
Loading