From 3ad417d5d6ae88e7d30744824a0b1382704473a2 Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Mon, 29 Jan 2024 16:38:28 +0100 Subject: [PATCH] kie-kogito-serverless-operator-361: Add data-index and job service startupProbes to the workflow Deployment --- controllers/platform/services/properties.go | 6 + controllers/platform/services/services.go | 17 +- .../common/constants/platform_services.go | 13 +- .../common/properties/application_test.go | 15 +- .../sonataflowplatform_controller_test.go | 4 +- controllers/workflowdef/utils.go | 113 +++++++ controllers/workflowdef/utils_suite_test.go | 32 ++ controllers/workflowdef/utils_test.go | 285 ++++++++++++++++++ 8 files changed, 465 insertions(+), 20 deletions(-) create mode 100644 controllers/workflowdef/utils.go create mode 100644 controllers/workflowdef/utils_suite_test.go create mode 100644 controllers/workflowdef/utils_test.go diff --git a/controllers/platform/services/properties.go b/controllers/platform/services/properties.go index dbe6dc3f8..c541f5b04 100644 --- a/controllers/platform/services/properties.go +++ b/controllers/platform/services/properties.go @@ -24,6 +24,8 @@ import ( "net/url" "strings" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" "github.com/apache/incubator-kie-kogito-serverless-operator/utils" "k8s.io/klog/v2" @@ -164,6 +166,7 @@ func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platf if workflow != nil && !profiles.IsDevProfile(workflow) && di.IsServiceEnabled() { props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true") props.Set(constants.KogitoProcessInstancesEventsEnabled, "true") + props.Set(constants.KogitoDataIndexHealthCheckEnabled, "true") di := NewDataIndexHandler(platform) p, err := di.GenerateWorkflowProperties() if err != nil { @@ -186,6 +189,9 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol)) js := NewJobServiceHandler(platform) if workflow != nil && !profiles.IsDevProfile(workflow) && js.IsServiceEnabled() { + if workflowdef.HasTimeouts(workflow) { + props.Set(constants.KogitoJobServiceHealthCheckEnabled, "true") + } p, err := js.GenerateWorkflowProperties() if err != nil { return nil, err diff --git a/controllers/platform/services/services.go b/controllers/platform/services/services.go index e49679e5b..05aedbb60 100644 --- a/controllers/platform/services/services.go +++ b/controllers/platform/services/services.go @@ -81,8 +81,6 @@ type PlatformServiceHandler interface { GetLocalServiceBaseUrl() string // GetServiceBaseUrl returns the base url of the service, based on whether using local or cluster-scoped service. GetServiceBaseUrl() string - // GetServiceUrl returns the service url, based on whether using local or cluster-scoped service. - GetServiceUrl() string // IsServiceEnabled returns true if the service is enabled in either the spec or the status.clusterPlatformRef. IsServiceEnabled() bool // SetServiceUrlInStatus sets the service url in status. if reconciled instance does not have service set in spec AND @@ -150,10 +148,6 @@ func (d DataIndexHandler) IsServiceEnabled() bool { return d.IsServiceEnabledInSpec() || d.isServiceEnabledInStatus() } -func (d DataIndexHandler) GetServiceUrl() string { - return d.GetServiceBaseUrl() + constants.KogitoProcessInstancesEventsPath -} - func (d DataIndexHandler) GetServiceBaseUrl() string { if d.IsServiceEnabledInSpec() { return d.GetLocalServiceBaseUrl() @@ -236,8 +230,9 @@ func (d DataIndexHandler) GetServiceCmName() string { func (d DataIndexHandler) GenerateWorkflowProperties() (*properties.Properties, error) { props := properties.NewProperties() if d.IsServiceEnabled() { + props.Set(constants.KogitoDataIndexURL, d.GetServiceBaseUrl()) props.Set(constants.KogitoProcessDefinitionsEventsURL, d.GetServiceBaseUrl()+constants.KogitoProcessDefinitionsEventsPath) - props.Set(constants.KogitoProcessInstancesEventsURL, d.GetServiceUrl()) + props.Set(constants.KogitoProcessInstancesEventsURL, d.GetServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath) } return props, nil } @@ -313,10 +308,6 @@ func (j JobServiceHandler) IsServiceEnabled() bool { return j.IsServiceEnabledInSpec() || j.isServiceEnabledInStatus() } -func (j JobServiceHandler) GetServiceUrl() string { - return j.GetServiceBaseUrl() + constants.JobServiceURLPath -} - func (j JobServiceHandler) GetServiceBaseUrl() string { if j.IsServiceEnabledInSpec() { return j.GetLocalServiceBaseUrl() @@ -411,8 +402,8 @@ func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties, func (j JobServiceHandler) GenerateWorkflowProperties() (*properties.Properties, error) { props := properties.NewProperties() if j.IsServiceEnabled() { - // add data source reactive URL - props.Set(constants.JobServiceRequestEventsURL, j.GetServiceUrl()) + props.Set(constants.KogitoJobServiceURL, j.GetServiceBaseUrl()) + props.Set(constants.JobServiceRequestEventsURL, j.GetServiceBaseUrl()+constants.JobServiceJobEventsPath) } return props, nil } diff --git a/controllers/profiles/common/constants/platform_services.go b/controllers/profiles/common/constants/platform_services.go index 3db52e16c..e0f249273 100644 --- a/controllers/profiles/common/constants/platform_services.go +++ b/controllers/profiles/common/constants/platform_services.go @@ -32,7 +32,7 @@ const ( JobServiceStatusChangeEventsURL = "mp.messaging.outgoing.kogito-job-service-job-status-events-http.url" JobServiceURLProtocol = "http" JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url" - JobServiceURLPath = "/v2/jobs/events" + JobServiceJobEventsPath = "/v2/jobs/events" KogitoProcessEventsProtocol = "http" KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url" @@ -42,7 +42,16 @@ const ( KogitoProcessDefinitionsEventsEnabled = "kogito.events.processdefinitions.enabled" KogitoProcessDefinitionsEventsPath = "/definitions" KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled" - KogitoEventsVariablesEnabled = "kogito.events.variables.enabled" + // KogitoDataIndexHealthCheckEnabled configures if a workflow must check for the data index availability as part + // of its start health check. + KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled" + // KogitoDataIndexURL configures the data index url, this value can be used internally by the workflow. + KogitoDataIndexURL = "kogito.data-index.url" + // KogitoJobServiceHealthCheckEnabled configures if a workflow must check for the job service availability as part + // of its start health check. + KogitoJobServiceHealthCheckEnabled = "kogito.jobs-service.health-enabled" + // KogitoJobServiceURL configures the jobs service, this value can be used internally by the workflow. + KogitoJobServiceURL = "kogito.jobs-service.url" KogitoServiceURLProperty = "kogito.service.url" KogitoServiceURLProtocol = "http" DataIndexKafkaSmallRyeHealthProperty = `quarkus.smallrye-health.check."io.quarkus.kafka.client.health.KafkaHealthCheck".enabled` diff --git a/controllers/profiles/common/properties/application_test.go b/controllers/profiles/common/properties/application_test.go index 2a95e9674..543247dd7 100644 --- a/controllers/profiles/common/properties/application_test.go +++ b/controllers/profiles/common/properties/application_test.go @@ -239,7 +239,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.NoError(t, err) generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) - assert.Equal(t, 15, len(generatedProps.Keys())) + assert.Equal(t, 18, len(generatedProps.Keys())) assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/definitions", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, "")) assert.Equal(t, "true", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, "")) assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/processes", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, "")) @@ -249,6 +249,9 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.Equal(t, "", generatedProps.GetString(constants.JobServiceDataSourceReactiveURL, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) + assert.Equal(t, "true", generatedProps.GetString(constants.KogitoDataIndexHealthCheckEnabled, "")) + assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoDataIndexURL, "")) + assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, "")) // disabling data index bypasses config of outgoing events url platform.Spec.Services.DataIndex.Enabled = nil @@ -256,7 +259,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.NoError(t, err) generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) - assert.Equal(t, 13, len(generatedProps.Keys())) + assert.Equal(t, 14, len(generatedProps.Keys())) assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, "")) assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, "")) assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, "")) @@ -265,6 +268,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) + assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, "")) // disabling job service bypasses config of outgoing events url platform.Spec.Services.JobService.Enabled = nil @@ -465,6 +469,7 @@ func generateJobServiceWorkflowProductionProperties() *properties.Properties { if jobServiceProdProperties == nil { jobServiceProdProperties = properties.NewProperties() jobServiceProdProperties.Set("kogito.service.url", "http://foo.default") + jobServiceProdProperties.Set("kogito.jobs-service.url", "http://foo-jobs-service.default") jobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0") jobServiceProdProperties.Set("quarkus.http.port", "8080") jobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false") @@ -489,7 +494,6 @@ func generateDataIndexWorkflowDevProperties() *properties.Properties { dataIndexDevProperties.Set("quarkus.devservices.enabled", "false") dataIndexDevProperties.Set("quarkus.kogito.devservices.enabled", "false") dataIndexDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") - //TODO revisar, pero para el dev profile esto no va dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") dataIndexDevProperties.Set("kogito.events.processdefinitions.enabled", "false") @@ -504,6 +508,8 @@ func generateDataIndexWorkflowProductionProperties() *properties.Properties { if dataIndexProdProperties == nil { dataIndexProdProperties = properties.NewProperties() dataIndexProdProperties.Set("kogito.service.url", "http://foo.default") + dataIndexProdProperties.Set("kogito.data-index.url", "http://foo-data-index-service.default") + dataIndexProdProperties.Set("kogito.data-index.health-enabled", "true") dataIndexProdProperties.Set("quarkus.http.host", "0.0.0.0") dataIndexProdProperties.Set("quarkus.http.port", "8080") dataIndexProdProperties.Set("quarkus.devservices.enabled", "false") @@ -544,6 +550,9 @@ func generateDataIndexAndJobServiceWorkflowProductionProperties() *properties.Pr if dataIndexJobServiceProdProperties == nil { dataIndexJobServiceProdProperties = properties.NewProperties() dataIndexJobServiceProdProperties.Set("kogito.service.url", "http://foo.default") + dataIndexJobServiceProdProperties.Set("kogito.data-index.url", "http://foo-data-index-service.default") + dataIndexJobServiceProdProperties.Set("kogito.data-index.health-enabled", "true") + dataIndexJobServiceProdProperties.Set("kogito.jobs-service.url", "http://foo-jobs-service.default") dataIndexJobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0") dataIndexJobServiceProdProperties.Set("quarkus.http.port", "8080") dataIndexJobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false") diff --git a/controllers/sonataflowplatform_controller_test.go b/controllers/sonataflowplatform_controller_test.go index 5594d6449..11e5bb932 100644 --- a/controllers/sonataflowplatform_controller_test.go +++ b/controllers/sonataflowplatform_controller_test.go @@ -573,11 +573,11 @@ func TestSonataFlowPlatformController(t *testing.T) { psDi := services.NewDataIndexHandler(ksp) psDi2 := services.NewDataIndexHandler(ksp2) assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.DataIndexRef.Url, psDi.GetLocalServiceBaseUrl()) - assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath, psDi2.GetServiceUrl()) + assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath, psDi2.GetServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath) psJs := services.NewJobServiceHandler(ksp) psJs2 := services.NewJobServiceHandler(ksp2) assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.JobServiceRef.Url, psJs.GetLocalServiceBaseUrl()) - assert.Equal(t, psJs.GetLocalServiceBaseUrl()+constants.JobServiceURLPath, psJs2.GetServiceUrl()) + assert.Equal(t, psJs.GetLocalServiceBaseUrl()+constants.JobServiceJobEventsPath, psJs2.GetServiceBaseUrl()+constants.JobServiceJobEventsPath) ksp2.Spec.Services = &v1alpha08.ServicesPlatformSpec{} diff --git a/controllers/workflowdef/utils.go b/controllers/workflowdef/utils.go new file mode 100644 index 000000000..87a0356f3 --- /dev/null +++ b/controllers/workflowdef/utils.go @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package workflowdef + +import ( + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/serverlessworkflow/sdk-go/v2/model" +) + +// HasTimeouts returns true if current workflow has configured any of the SonataFlow supported timeouts, false +// in any other case. This method might be reviewed when more timeouts are supported. +func HasTimeouts(workflow *operatorapi.SonataFlow) bool { + flow := &workflow.Spec.Flow + hasTimeouts := HasWorkflowExecTimeout(flow) || HasWorkflowEventTimeout(flow) + for i := 0; !hasTimeouts && i < len(flow.States); i++ { + state := flow.States[i] + switch state.Type { + case model.StateTypeEvent: + hasTimeouts = HasEventStateTimeouts(state.EventState) + case model.StateTypeOperation: + hasTimeouts = HasOperationStateTimeouts(state.OperationState) + case model.StateTypeSwitch: + hasTimeouts = HasSwitchStateTimeouts(state.SwitchState) + case model.StateTypeSleep: + hasTimeouts = true + case model.StateTypeParallel: + hasTimeouts = HasParallelStateTimeouts(state.ParallelState) + case model.StateTypeForEach: + hasTimeouts = HasForEachStateTimeouts(state.ForEachState) + case model.StateTypeCallback: + hasTimeouts = HasCallbackStateTimeouts(state.CallbackState) + } + } + return hasTimeouts +} + +func HasWorkflowEventTimeout(flow *operatorapi.Flow) bool { + return flow.Timeouts != nil && len(flow.Timeouts.EventTimeout) > 0 +} +func HasWorkflowExecTimeout(flow *operatorapi.Flow) bool { + return flow.Timeouts != nil && flow.Timeouts.WorkflowExecTimeout != nil && len(flow.Timeouts.WorkflowExecTimeout.Duration) > 0 +} + +func HasEventStateTimeouts(state *model.EventState) bool { + if state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0 { + return true + } + for _, onEvent := range state.OnEvents { + if hasActionsWithSleep(&onEvent.Actions) { + return true + } + } + return false +} + +func HasOperationStateTimeouts(state *model.OperationState) bool { + return hasActionsWithSleep(&state.Actions) +} + +func HasSwitchStateTimeouts(state *model.SwitchState) bool { + return state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0 +} + +func HasParallelStateTimeouts(state *model.ParallelState) bool { + for _, branch := range state.Branches { + if hasBranchTimeouts(&branch) { + return true + } + } + return false +} + +func hasBranchTimeouts(branch *model.Branch) bool { + return hasActionsWithSleep(&branch.Actions) +} + +func HasForEachStateTimeouts(state *model.ForEachState) bool { + return hasActionsWithSleep(&state.Actions) +} + +func HasCallbackStateTimeouts(state *model.CallbackState) bool { + return (state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0) || hasAnySleep(&state.Action) +} + +func hasActionsWithSleep(actions *[]model.Action) bool { + for _, action := range *actions { + if hasAnySleep(&action) { + return true + } + } + return false +} + +func hasAnySleep(action *model.Action) bool { + return action.Sleep != nil && (len(action.Sleep.Before) > 0 || len(action.Sleep.After) > 0) +} diff --git a/controllers/workflowdef/utils_suite_test.go b/controllers/workflowdef/utils_suite_test.go new file mode 100644 index 000000000..aa3919a75 --- /dev/null +++ b/controllers/workflowdef/utils_suite_test.go @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package workflowdef + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestProperties(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Utils Suite") +} diff --git a/controllers/workflowdef/utils_test.go b/controllers/workflowdef/utils_test.go new file mode 100644 index 000000000..9e279b10d --- /dev/null +++ b/controllers/workflowdef/utils_test.go @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package workflowdef + +import ( + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model" +) + +var ( + emptyDuration = "" + isoDuration = "PT30S" +) + +var _ = DescribeTable("Workflow has timeouts", + func(workflow *operatorapi.SonataFlow, expectedHasTimeouts bool) { + hasTimeouts := HasTimeouts(workflow) + Expect(hasTimeouts).Should(Equal(expectedHasTimeouts)) + }, + Entry("for a workflow with WorkflowExecTimeout", workflowWithWorkflowExecTimeout(&isoDuration), true), + Entry("for a workflow with empty WorkflowExecTimeout", workflowWithWorkflowExecTimeout(&emptyDuration), false), + Entry("for a workflow with nil WorkflowExecTimeout", workflowWithWorkflowExecTimeout(&emptyDuration), false), + + Entry("for a workflow with WorkflowEventTimeout", workflowWithWorkflowEventStateTimeout(&isoDuration), true), + Entry("for a workflow with empty WorkflowEventTimeout", workflowWithWorkflowEventStateTimeout(&emptyDuration), false), + Entry("for a workflow with nil WorkflowEventTimeout", workflowWithWorkflowEventStateTimeout(nil), false), + + Entry("for a workflow with EventState with timeouts", workflowWithEventStateWithTimeout(&isoDuration), true), + Entry("for a workflow with EventState empty timeouts", workflowWithEventStateWithTimeout(&emptyDuration), false), + Entry("for a workflow with EventState nil timeouts", workflowWithEventStateWithTimeout(&emptyDuration), false), + Entry("for a workflow with EventState with action sleep at before", workflowWithEventStateWithActionSleep(true, false), true), + Entry("for a workflow with EventState with action sleep at before", workflowWithEventStateWithActionSleep(false, true), true), + + Entry("for a workflow with OperationState with action sleep at before", workflowWithEventStateWithActionSleep(true, false), true), + Entry("for a workflow with OperationState with with action sleep at after", workflowWithEventStateWithActionSleep(false, true), true), + Entry("for a workflow with OperationState with no action sleep", workflowWithEventStateWithActionSleep(false, false), false), + + Entry("for a workflow with SwitchState with timeouts", workflowWithSwitchStateWithTimeout(&isoDuration), true), + Entry("for a workflow with SwitchState with empty timeouts", workflowWithSwitchStateWithTimeout(&emptyDuration), false), + Entry("for a workflow with SwitchState with nil timeouts", workflowWithSwitchStateWithTimeout(nil), false), + + Entry("for a workflow with SleepState", workflowWithSleepState(), true), + + Entry("for a workflow with ParallelState with branch with sleep at before", workflowWithParallelState(true, false), true), + Entry("for a workflow with ParallelState with branch with sleep at after", workflowWithParallelState(false, true), true), + Entry("for a workflow with ParallelState with branches with sleep at before and after", workflowWithParallelState(true, true), true), + Entry("for a workflow with ParallelState with no sleep branches", workflowWithParallelState(false, false), false), + + Entry("for a workflow with ForEachState with action sleep at before", workflowWithForEachStateWithActionSleep(true, false), true), + Entry("for a workflow with ForEachState with with action sleep at after", workflowWithForEachStateWithActionSleep(false, true), true), + Entry("for a workflow with ForEachState with no action sleep", workflowWithForEachStateWithActionSleep(false, false), false), + + Entry("for a workflow with CallbackState with timeouts", workflowWithCallbackStateTimeoutAndActionSleep(&isoDuration, nil, nil), true), + Entry("for a workflow with CallbackState with nil timeouts and before action sleep", workflowWithCallbackStateTimeoutAndActionSleep(nil, &isoDuration, nil), true), + Entry("for a workflow with CallbackState with nil timeouts and after action sleep", workflowWithCallbackStateTimeoutAndActionSleep(nil, nil, &isoDuration), true), + Entry("for a workflow with CallbackState with nil timeouts and no action sleep", workflowWithCallbackStateTimeoutAndActionSleep(nil, nil, nil), false), +) + +func workflowWithWorkflowExecTimeout(duration *string) *operatorapi.SonataFlow { + wf := generateWorkflow() + if duration != nil { + wf.Spec.Flow.Timeouts = &cncfmodel.Timeouts{} + wf.Spec.Flow.Timeouts.WorkflowExecTimeout = &cncfmodel.WorkflowExecTimeout{ + Duration: *duration, + } + } + return wf +} + +func workflowWithWorkflowEventStateTimeout(duration *string) *operatorapi.SonataFlow { + wf := generateWorkflow() + if duration != nil { + wf.Spec.Flow.Timeouts = &cncfmodel.Timeouts{ + EventTimeout: *duration, + } + } + return wf +} + +func workflowWithEventStateWithTimeout(duration *string) *operatorapi.SonataFlow { + wf := generateWorkflow() + state := generateEventState() + if duration != nil { + state.EventState.Timeouts = &cncfmodel.EventStateTimeout{EventTimeout: *duration} + } + wf.Spec.Flow.States = []cncfmodel.State{*state} + return wf +} + +func workflowWithEventStateWithActionSleep(before bool, after bool) *operatorapi.SonataFlow { + wf := generateWorkflow() + state := generateEventState() + wf.Spec.Flow.States = []cncfmodel.State{*state} + state.EventState.OnEvents = []cncfmodel.OnEvents{ + { + Actions: generateActionsWithSleep(before, after), + }, + } + return wf +} + +func workflowWithOperationStateWithActionSleep(before bool, after bool) *operatorapi.SonataFlow { + wf := generateWorkflow() + state := generateOperationState() + wf.Spec.Flow.States = []cncfmodel.State{*state} + state.OperationState.Actions = generateActionsWithSleep(before, after) + return wf +} + +func workflowWithSwitchStateWithTimeout(duration *string) *operatorapi.SonataFlow { + wf := generateWorkflow() + state := generateSwitchState() + wf.Spec.Flow.States = []cncfmodel.State{*state} + if duration != nil { + state.SwitchState.Timeouts = &cncfmodel.SwitchStateTimeout{ + EventTimeout: *duration, + } + } + return wf +} + +func workflowWithSleepState() *operatorapi.SonataFlow { + wf := generateWorkflow() + wf.Spec.Flow.States = []cncfmodel.State{*generateSleepState()} + return wf +} + +func workflowWithParallelState(branchWithBeforeSleep bool, branchWithAfterSleep bool) *operatorapi.SonataFlow { + wf := generateWorkflow() + state := generateParallelState() + wf.Spec.Flow.States = []cncfmodel.State{*state} + if branchWithBeforeSleep { + branch := cncfmodel.Branch{ + Actions: []cncfmodel.Action{{Sleep: &cncfmodel.Sleep{Before: "PT5S"}}}, + } + state.ParallelState.Branches = append(state.ParallelState.Branches, branch) + } + if branchWithAfterSleep { + branch := cncfmodel.Branch{ + Actions: []cncfmodel.Action{{Sleep: &cncfmodel.Sleep{After: "PT5S"}}}, + } + state.ParallelState.Branches = append(state.ParallelState.Branches, branch) + } + return wf +} + +func workflowWithForEachStateWithActionSleep(before bool, after bool) *operatorapi.SonataFlow { + wf := generateWorkflow() + state := generateForEachState() + wf.Spec.Flow.States = []cncfmodel.State{*state} + state.ForEachState.Actions = generateActionsWithSleep(before, after) + return wf +} + +func workflowWithCallbackStateTimeoutAndActionSleep(duration *string, before *string, after *string) *operatorapi.SonataFlow { + wf := generateWorkflow() + state := generateCallbackState() + wf.Spec.Flow.States = []cncfmodel.State{*state} + if duration != nil { + state.CallbackState.Timeouts = &cncfmodel.CallbackStateTimeout{EventTimeout: *duration} + } + state.CallbackState.Action = cncfmodel.Action{} + if before != nil || after != nil { + state.CallbackState.Action.Sleep = &cncfmodel.Sleep{} + if before != nil { + state.CallbackState.Action.Sleep.Before = *before + } + if after != nil { + state.CallbackState.Action.Sleep.After = *after + } + } + return wf +} + +func generateWorkflow() *operatorapi.SonataFlow { + wf := &operatorapi.SonataFlow{ + Spec: operatorapi.SonataFlowSpec{ + Flow: operatorapi.Flow{}, + }, + } + return wf +} + +func generateEventState() *cncfmodel.State { + return &cncfmodel.State{ + BaseState: cncfmodel.BaseState{ + Type: cncfmodel.StateTypeEvent, + }, + EventState: &cncfmodel.EventState{}, + } +} + +func generateOperationState() *cncfmodel.State { + return &cncfmodel.State{ + BaseState: cncfmodel.BaseState{ + Type: cncfmodel.StateTypeOperation, + }, + OperationState: &cncfmodel.OperationState{}, + } +} + +func generateSwitchState() *cncfmodel.State { + return &cncfmodel.State{ + BaseState: cncfmodel.BaseState{ + Type: cncfmodel.StateTypeSwitch, + }, + SwitchState: &cncfmodel.SwitchState{}, + } +} + +func generateSleepState() *cncfmodel.State { + return &cncfmodel.State{ + BaseState: cncfmodel.BaseState{ + Type: cncfmodel.StateTypeSleep, + }, + SleepState: &cncfmodel.SleepState{}, + } +} + +func generateParallelState() *cncfmodel.State { + return &cncfmodel.State{ + BaseState: cncfmodel.BaseState{ + Type: cncfmodel.StateTypeParallel, + }, + ParallelState: &cncfmodel.ParallelState{ + Branches: []cncfmodel.Branch{}, + }, + } +} + +func generateForEachState() *cncfmodel.State { + return &cncfmodel.State{ + BaseState: cncfmodel.BaseState{ + Type: cncfmodel.StateTypeForEach, + }, + ForEachState: &cncfmodel.ForEachState{}, + } +} + +func generateCallbackState() *cncfmodel.State { + return &cncfmodel.State{ + BaseState: cncfmodel.BaseState{ + Type: cncfmodel.StateTypeCallback, + }, + CallbackState: &cncfmodel.CallbackState{}, + } +} + +func generateActionsWithSleep(before bool, after bool) []cncfmodel.Action { + var actions []cncfmodel.Action + if before { + actions = append(actions, cncfmodel.Action{ + Sleep: &cncfmodel.Sleep{ + Before: "PT30S", + }, + }) + } + if after { + actions = append(actions, cncfmodel.Action{ + Sleep: &cncfmodel.Sleep{ + After: "PT30S", + }, + }) + } + return actions +}