Skip to content

Commit

Permalink
kie-kogito-serverless-operator-361: Add data-index and job service st…
Browse files Browse the repository at this point in the history
…artupProbes to the workflow Deployment
  • Loading branch information
wmedvede committed Jan 30, 2024
1 parent 7ba75e5 commit 02aebd9
Show file tree
Hide file tree
Showing 7 changed files with 462 additions and 5 deletions.
6 changes: 6 additions & 0 deletions controllers/platform/services/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"net/url"
"strings"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"

"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -163,6 +165,7 @@ func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platf
if workflow != nil && !profiles.IsDevProfile(workflow) && dataIndexEnabled(platform) {
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "true")
props.Set(constants.KogitoDataIndexHealthCheckEnabled, "true")
di := NewDataIndexHandler(platform)
p, err := di.GenerateWorkflowProperties()
if err != nil {
Expand All @@ -185,6 +188,9 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol))
if workflow != nil && !profiles.IsDevProfile(workflow) && jobServiceEnabled(platform) {
js := NewJobServiceHandler(platform)
if workflowdef.HasTimeouts(workflow) {
props.Set(constants.KogitoJobServiceHealthCheckEnabled, "true")
}
p, err := js.GenerateWorkflowProperties()
if err != nil {
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion controllers/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (d DataIndexHandler) GenerateWorkflowProperties() (*properties.Properties,
props := properties.NewProperties()
if d.platform.Spec.Services.DataIndex != nil {
dataIndexUrl := generateServiceURL(constants.KogitoProcessEventsProtocol, d.platform.Namespace, d.GetServiceName())
props.Set(constants.KogitoDataIndexURL, dataIndexUrl)
props.Set(constants.KogitoProcessDefinitionsEventsURL, fmt.Sprintf("%s/definitions", dataIndexUrl))
props.Set(constants.KogitoProcessInstancesEventsURL, fmt.Sprintf("%s/processes", dataIndexUrl))
}
Expand Down Expand Up @@ -295,7 +296,9 @@ func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties,

func (j JobServiceHandler) GenerateWorkflowProperties() (*properties.Properties, error) {
props := properties.NewProperties()
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s/v2/jobs/events", generateServiceURL(constants.KogitoProcessEventsProtocol, j.platform.Namespace, j.GetServiceName())))
jobServiceUrl := generateServiceURL(constants.KogitoProcessEventsProtocol, j.platform.Namespace, j.GetServiceName())
props.Set(constants.KogitoJobServiceURL, jobServiceUrl)
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s/v2/jobs/events", jobServiceUrl))
return props, nil
}

Expand Down
11 changes: 10 additions & 1 deletion controllers/profiles/common/constants/platform_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,16 @@ const (
KogitoProcessDefinitionsEventsURL = "mp.messaging.outgoing.kogito-processdefinitions-events.url"
KogitoProcessDefinitionsEventsEnabled = "kogito.events.processdefinitions.enabled"
KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled"
KogitoEventsVariablesEnabled = "kogito.events.variables.enabled"
// KogitoDataIndexHealthCheckEnabled configures if a workflow must check for the data index availability as part
// of its start health check.
KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled"
// KogitoDataIndexURL configures the data index url, this value can be used internally by the workflow.
KogitoDataIndexURL = "kogito.data-index.url"
// KogitoJobServiceHealthCheckEnabled configures if a workflow must check for the job service availability as part
// of its start health check.
KogitoJobServiceHealthCheckEnabled = "kogito.jobs-service.health-enabled"
// KogitoJobServiceURL configures the jobs service, this value can be used internally by the workflow.
KogitoJobServiceURL = "kogito.jobs-service.url"
KogitoServiceURLProperty = "kogito.service.url"
KogitoServiceURLProtocol = "http"
DataIndexKafkaSmallRyeHealthProperty = `quarkus.smallrye-health.check."io.quarkus.kafka.client.health.KafkaHealthCheck".enabled`
Expand Down
15 changes: 12 additions & 3 deletions controllers/profiles/common/properties/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.NoError(t, err)
generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
assert.Equal(t, 15, len(generatedProps.Keys()))
assert.Equal(t, 18, len(generatedProps.Keys()))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/definitions", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, ""))
assert.Equal(t, "true", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/processes", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, ""))
Expand All @@ -249,14 +249,17 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceDataSourceReactiveURL, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
assert.Equal(t, "true", generatedProps.GetString(constants.KogitoDataIndexHealthCheckEnabled, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoDataIndexURL, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, ""))

// disabling data index bypasses config of outgoing events url
platform.Spec.Services.DataIndex.Enabled = nil
props, err = NewAppPropertyHandler(workflow, platform)
assert.NoError(t, err)
generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
assert.Equal(t, 13, len(generatedProps.Keys()))
assert.Equal(t, 14, len(generatedProps.Keys()))
assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, ""))
assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, ""))
assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, ""))
Expand All @@ -265,6 +268,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, ""))

// disabling job service bypasses config of outgoing events url
platform.Spec.Services.JobService.Enabled = nil
Expand Down Expand Up @@ -465,6 +469,7 @@ func generateJobServiceWorkflowProductionProperties() *properties.Properties {
if jobServiceProdProperties == nil {
jobServiceProdProperties = properties.NewProperties()
jobServiceProdProperties.Set("kogito.service.url", "http://foo.default")
jobServiceProdProperties.Set("kogito.jobs-service.url", "http://foo-jobs-service.default")
jobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0")
jobServiceProdProperties.Set("quarkus.http.port", "8080")
jobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false")
Expand All @@ -489,7 +494,6 @@ func generateDataIndexWorkflowDevProperties() *properties.Properties {
dataIndexDevProperties.Set("quarkus.devservices.enabled", "false")
dataIndexDevProperties.Set("quarkus.kogito.devservices.enabled", "false")
dataIndexDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false")
//TODO revisar, pero para el dev profile esto no va
dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http")
dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events")
dataIndexDevProperties.Set("kogito.events.processdefinitions.enabled", "false")
Expand All @@ -504,6 +508,8 @@ func generateDataIndexWorkflowProductionProperties() *properties.Properties {
if dataIndexProdProperties == nil {
dataIndexProdProperties = properties.NewProperties()
dataIndexProdProperties.Set("kogito.service.url", "http://foo.default")
dataIndexProdProperties.Set("kogito.data-index.url", "http://foo-data-index-service.default")
dataIndexProdProperties.Set("kogito.data-index.health-enabled", "true")
dataIndexProdProperties.Set("quarkus.http.host", "0.0.0.0")
dataIndexProdProperties.Set("quarkus.http.port", "8080")
dataIndexProdProperties.Set("quarkus.devservices.enabled", "false")
Expand Down Expand Up @@ -544,6 +550,9 @@ func generateDataIndexAndJobServiceWorkflowProductionProperties() *properties.Pr
if dataIndexJobServiceProdProperties == nil {
dataIndexJobServiceProdProperties = properties.NewProperties()
dataIndexJobServiceProdProperties.Set("kogito.service.url", "http://foo.default")
dataIndexJobServiceProdProperties.Set("kogito.data-index.url", "http://foo-data-index-service.default")
dataIndexJobServiceProdProperties.Set("kogito.data-index.health-enabled", "true")
dataIndexJobServiceProdProperties.Set("kogito.jobs-service.url", "http://foo-jobs-service.default")
dataIndexJobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0")
dataIndexJobServiceProdProperties.Set("quarkus.http.port", "8080")
dataIndexJobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false")
Expand Down
113 changes: 113 additions & 0 deletions controllers/workflowdef/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package workflowdef

import (
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/serverlessworkflow/sdk-go/v2/model"
)

// HasTimeouts returns true if current workflow has configured any of the SonataFlow supported timeouts, false
// in any other case. This method might be reviewed when more timeouts are supported.
func HasTimeouts(workflow *operatorapi.SonataFlow) bool {
flow := &workflow.Spec.Flow
hasTimeouts := HasWorkflowExecTimeout(flow) || HasWorkflowEventTimeout(flow)
for i := 0; !hasTimeouts && i < len(flow.States); i++ {
state := flow.States[i]
switch state.Type {
case model.StateTypeEvent:
hasTimeouts = HasEventStateTimeouts(state.EventState)
case model.StateTypeOperation:
hasTimeouts = HasOperationStateTimeouts(state.OperationState)
case model.StateTypeSwitch:
hasTimeouts = HasSwitchStateTimeouts(state.SwitchState)
case model.StateTypeSleep:
hasTimeouts = true
case model.StateTypeParallel:
hasTimeouts = HasParallelStateTimeouts(state.ParallelState)
case model.StateTypeForEach:
hasTimeouts = HasForEachStateTimeouts(state.ForEachState)
case model.StateTypeCallback:
hasTimeouts = HasCallbackStateTimeouts(state.CallbackState)
}
}
return hasTimeouts
}

func HasWorkflowEventTimeout(flow *operatorapi.Flow) bool {
return flow.Timeouts != nil && len(flow.Timeouts.EventTimeout) > 0
}
func HasWorkflowExecTimeout(flow *operatorapi.Flow) bool {
return flow.Timeouts != nil && flow.Timeouts.WorkflowExecTimeout != nil && len(flow.Timeouts.WorkflowExecTimeout.Duration) > 0
}

func HasEventStateTimeouts(state *model.EventState) bool {
if state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0 {
return true
}
for _, onEvent := range state.OnEvents {
if hasActionsWithSleep(&onEvent.Actions) {
return true
}
}
return false
}

func HasOperationStateTimeouts(state *model.OperationState) bool {
return hasActionsWithSleep(&state.Actions)
}

func HasSwitchStateTimeouts(state *model.SwitchState) bool {
return state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0
}

func HasParallelStateTimeouts(state *model.ParallelState) bool {
for _, branch := range state.Branches {
if hasBranchTimeouts(&branch) {
return true
}
}
return false
}

func hasBranchTimeouts(branch *model.Branch) bool {
return hasActionsWithSleep(&branch.Actions)
}

func HasForEachStateTimeouts(state *model.ForEachState) bool {
return hasActionsWithSleep(&state.Actions)
}

func HasCallbackStateTimeouts(state *model.CallbackState) bool {
return (state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0) || hasAnySleep(&state.Action)
}

func hasActionsWithSleep(actions *[]model.Action) bool {
for _, action := range *actions {
if hasAnySleep(&action) {
return true
}
}
return false
}

func hasAnySleep(action *model.Action) bool {
return action.Sleep != nil && (len(action.Sleep.Before) > 0 || len(action.Sleep.After) > 0)
}
32 changes: 32 additions & 0 deletions controllers/workflowdef/utils_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package workflowdef

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestProperties(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Utils Suite")
}
Loading

0 comments on commit 02aebd9

Please sign in to comment.