Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kie-kogito-serverless-operator-361: Add data-index and job service st… #377

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions controllers/platform/services/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"net/url"
"strings"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"

"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -164,6 +166,7 @@ func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platf
if workflow != nil && !profiles.IsDevProfile(workflow) && di.IsServiceEnabled() {
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "true")
props.Set(constants.KogitoDataIndexHealthCheckEnabled, "true")
di := NewDataIndexHandler(platform)
p, err := di.GenerateWorkflowProperties()
if err != nil {
Expand All @@ -186,6 +189,9 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol))
js := NewJobServiceHandler(platform)
if workflow != nil && !profiles.IsDevProfile(workflow) && js.IsServiceEnabled() {
if workflowdef.HasTimeouts(workflow) {
props.Set(constants.KogitoJobServiceHealthCheckEnabled, "true")
}
p, err := js.GenerateWorkflowProperties()
if err != nil {
return nil, err
Expand Down
17 changes: 4 additions & 13 deletions controllers/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ type PlatformServiceHandler interface {
GetLocalServiceBaseUrl() string
// GetServiceBaseUrl returns the base url of the service, based on whether using local or cluster-scoped service.
GetServiceBaseUrl() string
// GetServiceUrl returns the service url, based on whether using local or cluster-scoped service.
GetServiceUrl() string
// IsServiceEnabled returns true if the service is enabled in either the spec or the status.clusterPlatformRef.
IsServiceEnabled() bool
// SetServiceUrlInStatus sets the service url in status. if reconciled instance does not have service set in spec AND
Expand Down Expand Up @@ -150,10 +148,6 @@ func (d DataIndexHandler) IsServiceEnabled() bool {
return d.IsServiceEnabledInSpec() || d.isServiceEnabledInStatus()
}

func (d DataIndexHandler) GetServiceUrl() string {
return d.GetServiceBaseUrl() + constants.KogitoProcessInstancesEventsPath
}

func (d DataIndexHandler) GetServiceBaseUrl() string {
if d.IsServiceEnabledInSpec() {
return d.GetLocalServiceBaseUrl()
Expand Down Expand Up @@ -236,8 +230,9 @@ func (d DataIndexHandler) GetServiceCmName() string {
func (d DataIndexHandler) GenerateWorkflowProperties() (*properties.Properties, error) {
props := properties.NewProperties()
if d.IsServiceEnabled() {
props.Set(constants.KogitoDataIndexURL, d.GetServiceBaseUrl())
props.Set(constants.KogitoProcessDefinitionsEventsURL, d.GetServiceBaseUrl()+constants.KogitoProcessDefinitionsEventsPath)
props.Set(constants.KogitoProcessInstancesEventsURL, d.GetServiceUrl())
props.Set(constants.KogitoProcessInstancesEventsURL, d.GetServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath)
}
return props, nil
}
Expand Down Expand Up @@ -313,10 +308,6 @@ func (j JobServiceHandler) IsServiceEnabled() bool {
return j.IsServiceEnabledInSpec() || j.isServiceEnabledInStatus()
}

func (j JobServiceHandler) GetServiceUrl() string {
return j.GetServiceBaseUrl() + constants.JobServiceURLPath
}

func (j JobServiceHandler) GetServiceBaseUrl() string {
if j.IsServiceEnabledInSpec() {
return j.GetLocalServiceBaseUrl()
Expand Down Expand Up @@ -411,8 +402,8 @@ func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties,
func (j JobServiceHandler) GenerateWorkflowProperties() (*properties.Properties, error) {
props := properties.NewProperties()
if j.IsServiceEnabled() {
// add data source reactive URL
props.Set(constants.JobServiceRequestEventsURL, j.GetServiceUrl())
props.Set(constants.KogitoJobServiceURL, j.GetServiceBaseUrl())
props.Set(constants.JobServiceRequestEventsURL, j.GetServiceBaseUrl()+constants.JobServiceJobEventsPath)
}
return props, nil
}
Expand Down
13 changes: 11 additions & 2 deletions controllers/profiles/common/constants/platform_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
JobServiceStatusChangeEventsURL = "mp.messaging.outgoing.kogito-job-service-job-status-events-http.url"
JobServiceURLProtocol = "http"
JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url"
JobServiceURLPath = "/v2/jobs/events"
JobServiceJobEventsPath = "/v2/jobs/events"

KogitoProcessEventsProtocol = "http"
KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url"
Expand All @@ -42,7 +42,16 @@ const (
KogitoProcessDefinitionsEventsEnabled = "kogito.events.processdefinitions.enabled"
KogitoProcessDefinitionsEventsPath = "/definitions"
KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled"
KogitoEventsVariablesEnabled = "kogito.events.variables.enabled"
// KogitoDataIndexHealthCheckEnabled configures if a workflow must check for the data index availability as part
// of its start health check.
KogitoDataIndexHealthCheckEnabled = "kogito.data-index.health-enabled"
// KogitoDataIndexURL configures the data index url, this value can be used internally by the workflow.
KogitoDataIndexURL = "kogito.data-index.url"
// KogitoJobServiceHealthCheckEnabled configures if a workflow must check for the job service availability as part
// of its start health check.
KogitoJobServiceHealthCheckEnabled = "kogito.jobs-service.health-enabled"
// KogitoJobServiceURL configures the jobs service, this value can be used internally by the workflow.
KogitoJobServiceURL = "kogito.jobs-service.url"
KogitoServiceURLProperty = "kogito.service.url"
KogitoServiceURLProtocol = "http"
DataIndexKafkaSmallRyeHealthProperty = `quarkus.smallrye-health.check."io.quarkus.kafka.client.health.KafkaHealthCheck".enabled`
Expand Down
15 changes: 12 additions & 3 deletions controllers/profiles/common/properties/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.NoError(t, err)
generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
assert.Equal(t, 15, len(generatedProps.Keys()))
assert.Equal(t, 18, len(generatedProps.Keys()))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/definitions", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, ""))
assert.Equal(t, "true", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/processes", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, ""))
Expand All @@ -249,14 +249,17 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceDataSourceReactiveURL, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
assert.Equal(t, "true", generatedProps.GetString(constants.KogitoDataIndexHealthCheckEnabled, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoDataIndexURL, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, ""))

// disabling data index bypasses config of outgoing events url
platform.Spec.Services.DataIndex.Enabled = nil
props, err = NewAppPropertyHandler(workflow, platform)
assert.NoError(t, err)
generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build())
assert.NoError(t, propsErr)
assert.Equal(t, 13, len(generatedProps.Keys()))
assert.Equal(t, 14, len(generatedProps.Keys()))
assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, ""))
assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, ""))
assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, ""))
Expand All @@ -265,6 +268,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, ""))
assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, ""))
assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, ""))

// disabling job service bypasses config of outgoing events url
platform.Spec.Services.JobService.Enabled = nil
Expand Down Expand Up @@ -465,6 +469,7 @@ func generateJobServiceWorkflowProductionProperties() *properties.Properties {
if jobServiceProdProperties == nil {
jobServiceProdProperties = properties.NewProperties()
jobServiceProdProperties.Set("kogito.service.url", "http://foo.default")
jobServiceProdProperties.Set("kogito.jobs-service.url", "http://foo-jobs-service.default")
jobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0")
jobServiceProdProperties.Set("quarkus.http.port", "8080")
jobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false")
Expand All @@ -489,7 +494,6 @@ func generateDataIndexWorkflowDevProperties() *properties.Properties {
dataIndexDevProperties.Set("quarkus.devservices.enabled", "false")
dataIndexDevProperties.Set("quarkus.kogito.devservices.enabled", "false")
dataIndexDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false")
//TODO revisar, pero para el dev profile esto no va
dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http")
dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events")
dataIndexDevProperties.Set("kogito.events.processdefinitions.enabled", "false")
Expand All @@ -504,6 +508,8 @@ func generateDataIndexWorkflowProductionProperties() *properties.Properties {
if dataIndexProdProperties == nil {
dataIndexProdProperties = properties.NewProperties()
dataIndexProdProperties.Set("kogito.service.url", "http://foo.default")
dataIndexProdProperties.Set("kogito.data-index.url", "http://foo-data-index-service.default")
dataIndexProdProperties.Set("kogito.data-index.health-enabled", "true")
dataIndexProdProperties.Set("quarkus.http.host", "0.0.0.0")
dataIndexProdProperties.Set("quarkus.http.port", "8080")
dataIndexProdProperties.Set("quarkus.devservices.enabled", "false")
Expand Down Expand Up @@ -544,6 +550,9 @@ func generateDataIndexAndJobServiceWorkflowProductionProperties() *properties.Pr
if dataIndexJobServiceProdProperties == nil {
dataIndexJobServiceProdProperties = properties.NewProperties()
dataIndexJobServiceProdProperties.Set("kogito.service.url", "http://foo.default")
dataIndexJobServiceProdProperties.Set("kogito.data-index.url", "http://foo-data-index-service.default")
dataIndexJobServiceProdProperties.Set("kogito.data-index.health-enabled", "true")
dataIndexJobServiceProdProperties.Set("kogito.jobs-service.url", "http://foo-jobs-service.default")
dataIndexJobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0")
dataIndexJobServiceProdProperties.Set("quarkus.http.port", "8080")
dataIndexJobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false")
Expand Down
4 changes: 2 additions & 2 deletions controllers/sonataflowplatform_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,11 +573,11 @@ func TestSonataFlowPlatformController(t *testing.T) {
psDi := services.NewDataIndexHandler(ksp)
psDi2 := services.NewDataIndexHandler(ksp2)
assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.DataIndexRef.Url, psDi.GetLocalServiceBaseUrl())
assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath, psDi2.GetServiceUrl())
assert.Equal(t, psDi.GetLocalServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath, psDi2.GetServiceBaseUrl()+constants.KogitoProcessInstancesEventsPath)
psJs := services.NewJobServiceHandler(ksp)
psJs2 := services.NewJobServiceHandler(ksp2)
assert.Equal(t, ksp2.Status.ClusterPlatformRef.Services.JobServiceRef.Url, psJs.GetLocalServiceBaseUrl())
assert.Equal(t, psJs.GetLocalServiceBaseUrl()+constants.JobServiceURLPath, psJs2.GetServiceUrl())
assert.Equal(t, psJs.GetLocalServiceBaseUrl()+constants.JobServiceJobEventsPath, psJs2.GetServiceBaseUrl()+constants.JobServiceJobEventsPath)

ksp2.Spec.Services = &v1alpha08.ServicesPlatformSpec{}

Expand Down
113 changes: 113 additions & 0 deletions controllers/workflowdef/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package workflowdef

import (
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/serverlessworkflow/sdk-go/v2/model"
)

// HasTimeouts returns true if current workflow has configured any of the SonataFlow supported timeouts, false
// in any other case. This method might be reviewed when more timeouts are supported.
func HasTimeouts(workflow *operatorapi.SonataFlow) bool {
flow := &workflow.Spec.Flow
hasTimeouts := HasWorkflowExecTimeout(flow) || HasWorkflowEventTimeout(flow)
for i := 0; !hasTimeouts && i < len(flow.States); i++ {
state := flow.States[i]
switch state.Type {
case model.StateTypeEvent:
hasTimeouts = HasEventStateTimeouts(state.EventState)
case model.StateTypeOperation:
hasTimeouts = HasOperationStateTimeouts(state.OperationState)
case model.StateTypeSwitch:
hasTimeouts = HasSwitchStateTimeouts(state.SwitchState)
case model.StateTypeSleep:
hasTimeouts = true
case model.StateTypeParallel:
hasTimeouts = HasParallelStateTimeouts(state.ParallelState)
case model.StateTypeForEach:
hasTimeouts = HasForEachStateTimeouts(state.ForEachState)
case model.StateTypeCallback:
hasTimeouts = HasCallbackStateTimeouts(state.CallbackState)
}
}
return hasTimeouts
}

func HasWorkflowEventTimeout(flow *operatorapi.Flow) bool {
return flow.Timeouts != nil && len(flow.Timeouts.EventTimeout) > 0
}
func HasWorkflowExecTimeout(flow *operatorapi.Flow) bool {
return flow.Timeouts != nil && flow.Timeouts.WorkflowExecTimeout != nil && len(flow.Timeouts.WorkflowExecTimeout.Duration) > 0
}

func HasEventStateTimeouts(state *model.EventState) bool {
if state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0 {
return true
}
for _, onEvent := range state.OnEvents {
if hasActionsWithSleep(&onEvent.Actions) {
return true
}
}
return false
}

func HasOperationStateTimeouts(state *model.OperationState) bool {
return hasActionsWithSleep(&state.Actions)
}

func HasSwitchStateTimeouts(state *model.SwitchState) bool {
return state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0
}

func HasParallelStateTimeouts(state *model.ParallelState) bool {
for _, branch := range state.Branches {
if hasBranchTimeouts(&branch) {
return true
}
}
return false
}

func hasBranchTimeouts(branch *model.Branch) bool {
return hasActionsWithSleep(&branch.Actions)
}

func HasForEachStateTimeouts(state *model.ForEachState) bool {
return hasActionsWithSleep(&state.Actions)
}

func HasCallbackStateTimeouts(state *model.CallbackState) bool {
return (state.Timeouts != nil && len(state.Timeouts.EventTimeout) > 0) || hasAnySleep(&state.Action)
}

func hasActionsWithSleep(actions *[]model.Action) bool {
for _, action := range *actions {
if hasAnySleep(&action) {
return true
}
}
return false
}

func hasAnySleep(action *model.Action) bool {
return action.Sleep != nil && (len(action.Sleep.Before) > 0 || len(action.Sleep.After) > 0)
}
32 changes: 32 additions & 0 deletions controllers/workflowdef/utils_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package workflowdef

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestProperties(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Utils Suite")
}
Loading
Loading