Skip to content

Commit

Permalink
kie-kogito-serverless-operator-361: Add data-index and job service st…
Browse files Browse the repository at this point in the history
…artupProbes to the workflow Deployment (apache#377)
  • Loading branch information
wmedvede authored and rgdoliveira committed Feb 5, 2024
1 parent 0a8e554 commit 5151fee
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 20 deletions.
6 changes: 6 additions & 0 deletions controllers/platform/services/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"net/url"
"strings"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"

"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -164,6 +166,7 @@ func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platf
if workflow != nil && !profiles.IsDevProfile(workflow) && di.IsServiceEnabled() {
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "true")
props.Set(constants.KogitoDataIndexHealthCheckEnabled, "true")
di := NewDataIndexHandler(platform)
p, err := di.GenerateWorkflowProperties()
if err != nil {
Expand All @@ -186,6 +189,9 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol))
js := NewJobServiceHandler(platform)
if workflow != nil && !profiles.IsDevProfile(workflow) && js.IsServiceEnabled() {
if workflowdef.HasTimeouts(workflow) {
props.Set(constants.KogitoJobServiceHealthCheckEnabled, "true")
}
p, err := js.GenerateWorkflowProperties()
if err != nil {
return nil, err
Expand Down
17 changes: 4 additions & 13 deletions controllers/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ type PlatformServiceHandler interface {
GetLocalServiceBaseUrl() string
// GetServiceBaseUrl returns the base url of the service, based on whether using local or cluster-scoped service.
GetServiceBaseUrl() string
// GetServiceUrl returns the service url, based on whether using local or cluster-scoped service.
GetServiceUrl() string
// IsServiceEnabled returns true if the service is enabled in either the spec or the status.clusterPlatformRef.
IsServiceEnabled() bool
// SetServiceUrlInStatus sets the service url in status. if reconciled instance does not have service set in spec AND
Expand Down Expand Up @@ -150,10 +148,6 @@ func (d DataIndexHandler) IsServiceEnabled() bool {
return d.IsServiceEnabledInSpec() || d.isServiceEnabledInStatus()
}

func (d DataIndexHandler) GetServiceUrl() string {
return d.GetServiceBaseUrl() + constants.KogitoProcessInstancesEventsPath
}

func (d DataIndexHandler) GetServiceBaseUrl() string {
if d.IsServiceEnabledInSpec() {
return d.GetLocalServiceBaseUrl()
Expand Down Expand Up @@ -236,8 +230,9 @@ func (d DataIndexHandler) GetServiceCmName() string {
func (d DataIndexHandler) GenerateWorkflowProperties() (*properties.Properties, error) {
props := properties.NewProperties()
if d.IsServiceEnabled() {
props.Set(constants.KogitoDataIndexURL, d.GetServiceBaseUrl())
props.Set(constants.KogitoProcessDefinitionsEventsURL, d.GetServiceBaseUrl()+constants.KogitoProcessDefinitionsEventsPath)
props.Set(constants.KogitoProcessInstancesEventsURL, d.GetServiceUrl())
props.Set(constants.KogitoProcessInstancesEventsURL, d.GetServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath)
}
return props, nil
}
Expand Down Expand Up @@ -313,10 +308,6 @@ func (j JobServiceHandler) IsServiceEnabled() bool {
return j.IsServiceEnabledInSpec() || j.isServiceEnabledInStatus()
}

func (j JobServiceHandler) GetServiceUrl() string {
return j.GetServiceBaseUrl() + constants.JobServiceURLPath
}

func (j JobServiceHandler) GetServiceBaseUrl() string {
if j.IsServiceEnabledInSpec() {
return j.GetLocalServiceBaseUrl()
Expand Down Expand Up @@ -411,8 +402,8 @@ func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties,
func (j JobServiceHandler) GenerateWorkflowProperties() (*properties.Properties, error) {
props := properties.NewProperties()
if j.IsServiceEnabled() {
// add data source reactive URL
props.Set(constants.JobServiceRequestEventsURL, j.GetServiceUrl())
props.Set(constants.KogitoJobServiceURL, j.GetServiceBaseUrl())
props.Set(constants.JobServiceRequestEventsURL, j.GetServiceBaseUrl()+constants.JobServiceJobEventsPath)
}
return props, nil
}
Expand Down
13 changes: 11 additions & 2 deletions controllers/profiles/common/constants/platform_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
JobServiceStatusChangeEventsURL = "mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
JobServiceURLProtocol = "http"
JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url"
JobServiceURLPath = "/v2/jobs/events"
JobServiceJobEventsPath = "/v2/jobs/events"

KogitoProcessEventsProtocol = "http"
KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url"
Expand All @@ -42,7 +42,16 @@ const (
KogitoProcessDefinitionsEventsEnabled = "kogito.events.processdefinitions.enabled"
KogitoProcessDefinitionsEventsPath = "/definitions"
KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled"
KogitoEventsVariablesEnabled = "kogito.events.variables.enabled"
// KogitoDataIndexHealthCheckEnabled configures if a workflow must check for the data index availability as part
// of its start health check.
KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled"
// KogitoDataIndexURL configures the data index url, this value can be used internally by the workflow.
KogitoDataIndexURL = "kogito.data-index.url"
// KogitoJobServiceHealthCheckEnabled configures if a workflow must check for the job service availability as part
// of its start health check.
KogitoJobServiceHealthCheckEnabled = "kogito.jobs-service.health-enabled"
// KogitoJobServiceURL configures the jobs service, this value can be used internally by the workflow.
KogitoJobServiceURL = "kogito.jobs-service.url"
KogitoServiceURLProperty = "kogito.service.url"
KogitoServiceURLProtocol = "http"
DataIndexKafkaSmallRyeHealthProperty = `quarkus.smallrye-health.check."io.quarkus.kafka.client.health.KafkaHealthCheck".enabled`
Expand Down
15 changes: 12 additions & 3 deletions controllers/profiles/common/properties/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.NoError(t, err)
generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
assert.Equal(t, 15, len(generatedProps.Keys()))
assert.Equal(t, 18, len(generatedProps.Keys()))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/definitions", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, ""))
assert.Equal(t, "true", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/processes", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, ""))
Expand All @@ -249,14 +249,17 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceDataSourceReactiveURL, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
assert.Equal(t, "true", generatedProps.GetString(constants.KogitoDataIndexHealthCheckEnabled, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoDataIndexURL, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, ""))

// disabling data index bypasses config of outgoing events url
platform.Spec.Services.DataIndex.Enabled = nil
props, err = NewAppPropertyHandler(workflow, platform)
assert.NoError(t, err)
generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
assert.Equal(t, 13, len(generatedProps.Keys()))
assert.Equal(t, 14, len(generatedProps.Keys()))
assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, ""))
assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, ""))
assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, ""))
Expand All @@ -265,6 +268,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, ""))

// disabling job service bypasses config of outgoing events url
platform.Spec.Services.JobService.Enabled = nil
Expand Down Expand Up @@ -465,6 +469,7 @@ func generateJobServiceWorkflowProductionProperties() *properties.Properties {
if jobServiceProdProperties == nil {
jobServiceProdProperties = properties.NewProperties()
jobServiceProdProperties.Set("kogito.service.url", "http://foo.default")
jobServiceProdProperties.Set("kogito.jobs-service.url", "http://foo-jobs-service.default")
jobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0")
jobServiceProdProperties.Set("quarkus.http.port", "8080")
jobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false")
Expand All @@ -489,7 +494,6 @@ func generateDataIndexWorkflowDevProperties() *properties.Properties {
dataIndexDevProperties.Set("quarkus.devservices.enabled", "false")
dataIndexDevProperties.Set("quarkus.kogito.devservices.enabled", "false")
dataIndexDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false")
//TODO revisar, pero para el dev profile esto no va
dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http")
dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events")
dataIndexDevProperties.Set("kogito.events.processdefinitions.enabled", "false")
Expand All @@ -504,6 +508,8 @@ func generateDataIndexWorkflowProductionProperties() *properties.Properties {
if dataIndexProdProperties == nil {
dataIndexProdProperties = properties.NewProperties()
dataIndexProdProperties.Set("kogito.service.url", "http://foo.default")
dataIndexProdProperties.Set("kogito.data-index.url", "http://foo-data-index-service.default")
dataIndexProdProperties.Set("kogito.data-index.health-enabled", "true")
dataIndexProdProperties.Set("quarkus.http.host", "0.0.0.0")
dataIndexProdProperties.Set("quarkus.http.port", "8080")
dataIndexProdProperties.Set("quarkus.devservices.enabled", "false")
Expand Down Expand Up @@ -544,6 +550,9 @@ func generateDataIndexAndJobServiceWorkflowProductionProperties() *properties.Pr
if dataIndexJobServiceProdProperties == nil {
dataIndexJobServiceProdProperties = properties.NewProperties()
dataIndexJobServiceProdProperties.Set("kogito.service.url", "http://foo.default")
dataIndexJobServiceProdProperties.Set("kogito.data-index.url", "http://foo-data-index-service.default")
dataIndexJobServiceProdProperties.Set("kogito.data-index.health-enabled", "true")
dataIndexJobServiceProdProperties.Set("kogito.jobs-service.url", "http://foo-jobs-service.default")
dataIndexJobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0")
dataIndexJobServiceProdProperties.Set("quarkus.http.port", "8080")
dataIndexJobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false")
Expand Down
4 changes: 2 additions & 2 deletions controllers/sonataflowplatform_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,11 +573,11 @@ func TestSonataFlowPlatformController(t *testing.T) {
psDi := services.NewDataIndexHandler(ksp)
psDi2 := services.NewDataIndexHandler(ksp2)
assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.DataIndexRef.Url, psDi.GetLocalServiceBaseUrl())
assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath, psDi2.GetServiceUrl())
assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath, psDi2.GetServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath)
psJs := services.NewJobServiceHandler(ksp)
psJs2 := services.NewJobServiceHandler(ksp2)
assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.JobServiceRef.Url, psJs.GetLocalServiceBaseUrl())
assert.Equal(t, psJs.GetLocalServiceBaseUrl()+constants.JobServiceURLPath, psJs2.GetServiceUrl())
assert.Equal(t, psJs.GetLocalServiceBaseUrl()+constants.JobServiceJobEventsPath, psJs2.GetServiceBaseUrl()+constants.JobServiceJobEventsPath)

ksp2.Spec.Services = &v1alpha08.ServicesPlatformSpec{}

Expand Down
113 changes: 113 additions & 0 deletions controllers/workflowdef/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package workflowdef

import (
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/serverlessworkflow/sdk-go/v2/model"
)

// HasTimeouts returns true if current workflow has configured any of the SonataFlow supported timeouts, false
// in any other case. This method might be reviewed when more timeouts are supported.
func HasTimeouts(workflow *operatorapi.SonataFlow) bool {
flow := &workflow.Spec.Flow
hasTimeouts := HasWorkflowExecTimeout(flow) || HasWorkflowEventTimeout(flow)
for i := 0; !hasTimeouts && i < len(flow.States); i++ {
state := flow.States[i]
switch state.Type {
case model.StateTypeEvent:
hasTimeouts = HasEventStateTimeouts(state.EventState)
case model.StateTypeOperation:
hasTimeouts = HasOperationStateTimeouts(state.OperationState)
case model.StateTypeSwitch:
hasTimeouts = HasSwitchStateTimeouts(state.SwitchState)
case model.StateTypeSleep:
hasTimeouts = true
case model.StateTypeParallel:
hasTimeouts = HasParallelStateTimeouts(state.ParallelState)
case model.StateTypeForEach:
hasTimeouts = HasForEachStateTimeouts(state.ForEachState)
case model.StateTypeCallback:
hasTimeouts = HasCallbackStateTimeouts(state.CallbackState)
}
}
return hasTimeouts
}

func HasWorkflowEventTimeout(flow *operatorapi.Flow) bool {
return flow.Timeouts != nil && len(flow.Timeouts.EventTimeout) > 0
}
func HasWorkflowExecTimeout(flow *operatorapi.Flow) bool {
return flow.Timeouts != nil && flow.Timeouts.WorkflowExecTimeout != nil && len(flow.Timeouts.WorkflowExecTimeout.Duration) > 0
}

func HasEventStateTimeouts(state *model.EventState) bool {
if state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0 {
return true
}
for _, onEvent := range state.OnEvents {
if hasActionsWithSleep(&onEvent.Actions) {
return true
}
}
return false
}

func HasOperationStateTimeouts(state *model.OperationState) bool {
return hasActionsWithSleep(&state.Actions)
}

func HasSwitchStateTimeouts(state *model.SwitchState) bool {
return state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0
}

func HasParallelStateTimeouts(state *model.ParallelState) bool {
for _, branch := range state.Branches {
if hasBranchTimeouts(&branch) {
return true
}
}
return false
}

func hasBranchTimeouts(branch *model.Branch) bool {
return hasActionsWithSleep(&branch.Actions)
}

func HasForEachStateTimeouts(state *model.ForEachState) bool {
return hasActionsWithSleep(&state.Actions)
}

func HasCallbackStateTimeouts(state *model.CallbackState) bool {
return (state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0) || hasAnySleep(&state.Action)
}

func hasActionsWithSleep(actions *[]model.Action) bool {
for _, action := range *actions {
if hasAnySleep(&action) {
return true
}
}
return false
}

func hasAnySleep(action *model.Action) bool {
return action.Sleep != nil && (len(action.Sleep.Before) > 0 || len(action.Sleep.After) > 0)
}
32 changes: 32 additions & 0 deletions controllers/workflowdef/utils_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package workflowdef

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestProperties(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Utils Suite")
}
Loading

0 comments on commit 5151fee

Please sign in to comment.