From ad3b78bc2235cb056e5d691c9ee10c9528c6be7e Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Thu, 18 Jan 2024 18:52:03 +0100 Subject: [PATCH] kogito-serverless-operator-353: Enable the sending of the process definition event when the data-index is present --- controllers/platform/k8s.go | 56 ++-- controllers/platform/services/properties.go | 18 +- .../services/properties_services_test.go | 18 +- controllers/platform/services/services.go | 98 ++++--- .../common/app_properties_test.go.removed | 257 ------------------ .../common/constants/platform_services.go | 12 +- .../profiles/common/properties/application.go | 6 +- .../common/properties/application_test.go | 163 +++++------ .../sonataflowplatform_controller_test.go | 12 +- 9 files changed, 182 insertions(+), 458 deletions(-) delete mode 100644 controllers/profiles/common/app_properties_test.go.removed diff --git a/controllers/platform/k8s.go b/controllers/platform/k8s.go index d66155bf2..94e253555 100644 --- a/controllers/platform/k8s.go +++ b/controllers/platform/k8s.go @@ -63,13 +63,13 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S } if platform.Spec.Services.DataIndex != nil { - if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexService(platform)); err != nil { + if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexHandler(platform)); err != nil { return nil, err } } if platform.Spec.Services.JobService != nil { - if err := createServiceComponents(ctx, action.client, platform, services.NewJobService(platform)); err != nil { + if err := createServiceComponents(ctx, action.client, platform, services.NewJobServiceHandler(platform)); err != nil { return nil, err } } @@ -77,17 +77,17 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S return platform, nil } -func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error { - if err := createConfigMap(ctx, client, platform, ps); err != nil { +func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { + if err := createConfigMap(ctx, client, platform, psh); err != nil { return err } - if err := createDeployment(ctx, client, platform, ps); err != nil { + if err := createDeployment(ctx, client, platform, psh); err != nil { return err } - return createService(ctx, client, platform, ps) + return createService(ctx, client, platform, psh) } -func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error { +func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { readyProbe := &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ @@ -105,9 +105,9 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera liveProbe := readyProbe.DeepCopy() liveProbe.ProbeHandler.HTTPGet.Path = common.QuarkusHealthPathLive dataDeployContainer := &corev1.Container{ - Image: ps.GetServiceImageName(constants.PersistenceTypeEphemeral), - Env: ps.GetEnvironmentVariables(), - Resources: ps.GetPodResourceRequirements(), + Image: psh.GetServiceImageName(constants.PersistenceTypeEphemeral), + Env: psh.GetEnvironmentVariables(), + Resources: psh.GetPodResourceRequirements(), ReadinessProbe: readyProbe, LivenessProbe: liveProbe, Ports: []corev1.ContainerPort{ @@ -125,17 +125,17 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera }, }, } - dataDeployContainer = ps.ConfigurePersistence(dataDeployContainer) - dataDeployContainer, err := ps.MergeContainerSpec(dataDeployContainer) + dataDeployContainer = psh.ConfigurePersistence(dataDeployContainer) + dataDeployContainer, err := psh.MergeContainerSpec(dataDeployContainer) if err != nil { return err } // immutable - dataDeployContainer.Name = ps.GetContainerName() + dataDeployContainer.Name = psh.GetContainerName() - replicas := ps.GetReplicaCount() - lbl, selectorLbl := getLabels(platform, ps) + replicas := psh.GetReplicaCount() + lbl, selectorLbl := getLabels(platform, psh) dataDeploySpec := appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ MatchLabels: selectorLbl, @@ -152,7 +152,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: ps.GetServiceCmName(), + Name: psh.GetServiceCmName(), }, }, }, @@ -162,7 +162,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera }, } - dataDeploySpec.Template.Spec, err = ps.MergePodSpec(dataDeploySpec.Template.Spec) + dataDeploySpec.Template.Spec, err = psh.MergePodSpec(dataDeploySpec.Template.Spec) if err != nil { return err } @@ -171,7 +171,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera dataDeploy := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: platform.Namespace, - Name: ps.GetServiceName(), + Name: psh.GetServiceName(), Labels: lbl, }} if err := controllerutil.SetControllerReference(platform, dataDeploy, client.Scheme()); err != nil { @@ -192,8 +192,8 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera return nil } -func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error { - lbl, selectorLbl := getLabels(platform, ps) +func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { + lbl, selectorLbl := getLabels(platform, psh) dataSvcSpec := corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { @@ -208,7 +208,7 @@ func createService(ctx context.Context, client client.Client, platform *operator dataSvc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Namespace: platform.Namespace, - Name: ps.GetServiceName(), + Name: psh.GetServiceName(), Labels: lbl, }} if err := controllerutil.SetControllerReference(platform, dataSvc, client.Scheme()); err != nil { @@ -229,26 +229,26 @@ func createService(ctx context.Context, client client.Client, platform *operator return nil } -func getLabels(platform *operatorapi.SonataFlowPlatform, ps services.Platform) (map[string]string, map[string]string) { +func getLabels(platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (map[string]string, map[string]string) { lbl := map[string]string{ workflowproj.LabelApp: platform.Name, - workflowproj.LabelService: ps.GetServiceName(), + workflowproj.LabelService: psh.GetServiceName(), } selectorLbl := map[string]string{ - workflowproj.LabelService: ps.GetServiceName(), + workflowproj.LabelService: psh.GetServiceName(), } return lbl, selectorLbl } -func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error { - handler, err := services.NewServiceAppPropertyHandler(ps) +func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error { + handler, err := services.NewServiceAppPropertyHandler(psh) if err != nil { return err } - lbl, _ := getLabels(platform, ps) + lbl, _ := getLabels(platform, psh) configMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Name: ps.GetServiceCmName(), + Name: psh.GetServiceCmName(), Namespace: platform.Namespace, Labels: lbl, }, diff --git a/controllers/platform/services/properties.go b/controllers/platform/services/properties.go index 6544db6bf..5c411f38c 100644 --- a/controllers/platform/services/properties.go +++ b/controllers/platform/services/properties.go @@ -47,7 +47,7 @@ var ( type serviceAppPropertyHandler struct { userProperties string - platform Platform + serviceHandler PlatformServiceHandler defaultMutableProperties *properties.Properties } @@ -59,9 +59,9 @@ type ServiceAppPropertyHandler interface { // NewServiceAppPropertyHandler creates the default service configurations property handler // The set of properties is initialized with the operator provided immutable properties. // The set of defaultMutableProperties is initialized with the operator provided properties that the user might override. -func NewServiceAppPropertyHandler(ps Platform) (ServiceAppPropertyHandler, error) { +func NewServiceAppPropertyHandler(serviceHandler PlatformServiceHandler) (ServiceAppPropertyHandler, error) { handler := &serviceAppPropertyHandler{} - props, err := ps.GenerateServiceProperties() + props, err := serviceHandler.GenerateServiceProperties() if err != nil { return nil, err } @@ -83,7 +83,7 @@ func (a *serviceAppPropertyHandler) Build() string { props, propErr = properties.LoadString(a.userProperties) } if propErr != nil { - klog.V(log.D).InfoS("Can't load user's property", "service", a.platform.GetServiceName(), "properties", a.userProperties) + klog.V(log.D).InfoS("Can't load user's property", "service", a.serviceHandler.GetServiceName(), "properties", a.userProperties) props = properties.NewProperties() } props = utils.NewApplicationPropertiesBuilder(). @@ -158,10 +158,12 @@ func generateReactiveURL(postgresSpec *operatorapi.PersistencePostgreSql, schema // Never nil. func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) { props := properties.NewProperties() - props.Set(constants.KogitoProcessInstancesEnabled, "false") + props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "false") + props.Set(constants.KogitoProcessInstancesEventsEnabled, "false") if workflow != nil && !profiles.IsDevProfile(workflow) && dataIndexEnabled(platform) { - props.Set(constants.KogitoProcessInstancesEnabled, "true") - di := NewDataIndexService(platform) + props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true") + props.Set(constants.KogitoProcessInstancesEventsEnabled, "true") + di := NewDataIndexHandler(platform) p, err := di.GenerateWorkflowProperties() if err != nil { return nil, err @@ -182,7 +184,7 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat props.Set(constants.JobServiceRequestEventsConnector, constants.QuarkusHTTP) props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol)) if workflow != nil && !profiles.IsDevProfile(workflow) && jobServiceEnabled(platform) { - js := NewJobService(platform) + js := NewJobServiceHandler(platform) p, err := js.GenerateWorkflowProperties() if err != nil { return nil, err diff --git a/controllers/platform/services/properties_services_test.go b/controllers/platform/services/properties_services_test.go index 0ca098e02..90da80837 100644 --- a/controllers/platform/services/properties_services_test.go +++ b/controllers/platform/services/properties_services_test.go @@ -20,11 +20,10 @@ package services import ( + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/magiconair/properties" ) @@ -33,7 +32,7 @@ var ( disabled = false ) -var _ = Describe("Platform properties", func() { +var _ = Describe("PlatformServiceHandler properties", func() { var _ = Context("for service properties", func() { @@ -41,7 +40,7 @@ var _ = Describe("Platform properties", func() { DescribeTable("Job Service", func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) { - js := NewJobService(plfm) + js := NewJobServiceHandler(plfm) handler, err := NewServiceAppPropertyHandler(js) Expect(err).NotTo(HaveOccurred()) p, err := properties.LoadString(handler.Build()) @@ -49,7 +48,7 @@ var _ = Describe("Platform properties", func() { p.Sort() Expect(p).To(Equal(expectedProperties)) }, - Entry("with an empty spec", generatePlatform(emptyJobServiceSpec()), + Entry("with an empty spec", generatePlatform(emptyJobServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default")), generateJobServiceDeploymentDevProperties()), Entry("with enabled field undefined and with ephemeral persistence", generatePlatform(setJobServiceEnabledValue(nil), setPlatformName("foo"), setPlatformNamespace("default")), @@ -78,7 +77,7 @@ var _ = Describe("Platform properties", func() { ) DescribeTable("Data Index", func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) { - di := NewDataIndexService(plfm) + di := NewDataIndexHandler(plfm) handler, err := NewServiceAppPropertyHandler(di) Expect(err).NotTo(HaveOccurred()) p, err := properties.LoadString(handler.Build()) @@ -86,7 +85,7 @@ var _ = Describe("Platform properties", func() { p.Sort() Expect(p).To(Equal(expectedProperties)) }, - Entry("with ephemeral persistence", generatePlatform(emptyDataIndexServiceSpec()), generateDataIndexDeploymentProperties()), + Entry("with ephemeral persistence", generatePlatform(emptyDataIndexServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default")), generateDataIndexDeploymentProperties()), Entry("with postgreSQL persistence", generatePlatform(emptyDataIndexServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default"), setJobServiceJDBC("jdbc:postgresql://postgres:5432/sonataflow?currentSchema=myschema")), generateDataIndexDeploymentProperties()), ) @@ -98,6 +97,7 @@ var _ = Describe("Platform properties", func() { func generateJobServiceDeploymentDevProperties() *properties.Properties { p := properties.NewProperties() + p.Set("kogito.service.url", "http://foo-jobs-service.default") p.Set("quarkus.devservices.enabled", "false") p.Set("quarkus.http.host", "0.0.0.0") p.Set("quarkus.http.port", "8080") @@ -109,6 +109,7 @@ func generateJobServiceDeploymentDevProperties() *properties.Properties { func generateDataIndexDeploymentProperties() *properties.Properties { p := properties.NewProperties() + p.Set("kogito.service.url", "http://foo-data-index-service.default") p.Set("quarkus.devservices.enabled", "false") p.Set("quarkus.http.host", "0.0.0.0") p.Set("quarkus.http.port", "8080") @@ -120,6 +121,7 @@ func generateDataIndexDeploymentProperties() *properties.Properties { func generateJobServiceDeploymentWithPostgreSQLProperties() *properties.Properties { p := properties.NewProperties() + p.Set("kogito.service.url", "http://foo-jobs-service.default") p.Set("quarkus.devservices.enabled", "false") p.Set("quarkus.http.host", "0.0.0.0") p.Set("quarkus.http.port", "8080") @@ -132,6 +134,7 @@ func generateJobServiceDeploymentWithPostgreSQLProperties() *properties.Properti func generateJobServiceDeploymentWithDataIndexAndEphemeralProperties() *properties.Properties { p := properties.NewProperties() + p.Set("kogito.service.url", "http://foo-jobs-service.default") p.Set("kogito.jobs-service.http.job-status-change-events", "true") p.Set("mp.messaging.outgoing.kogito-job-service-job-status-events-http.url", "http://foo-data-index-service.default/jobs") p.Set("quarkus.devservices.enabled", "false") @@ -145,6 +148,7 @@ func generateJobServiceDeploymentWithDataIndexAndEphemeralProperties() *properti func generateJobServiceDeploymentWithDataIndexAndPostgreSQLProperties() *properties.Properties { p := properties.NewProperties() + p.Set("kogito.service.url", "http://foo-jobs-service.default") p.Set("kogito.jobs-service.http.job-status-change-events", "true") p.Set("mp.messaging.outgoing.kogito-job-service-job-status-events-http.url", "http://foo-data-index-service.default/jobs") p.Set("quarkus.devservices.enabled", "false") diff --git a/controllers/platform/services/services.go b/controllers/platform/services/services.go index 1c1d032ab..1e1a94246 100644 --- a/controllers/platform/services/services.go +++ b/controllers/platform/services/services.go @@ -34,7 +34,7 @@ import ( "github.com/imdario/mergo" ) -type Platform interface { +type PlatformServiceHandler interface { // GetContainerName returns the name of the service's container in the deployment. GetContainerName() string // GetServiceImageName returns the image name of the service's container. It takes in the service and persistence types and returns a string @@ -46,21 +46,21 @@ type Platform interface { GetServiceCmName() string // GetEnvironmentVariables returns the env variables to be injected to the service container GetEnvironmentVariables() []corev1.EnvVar - // GetResourceLimits returns the pod's memory and CPU resource requirements + // GetPodResourceRequirements returns the pod's memory and CPU resource requirements // Values for job service taken from // https://github.com/parodos-dev/orchestrator-helm-chart/blob/52d09eda56fdbed3060782df29847c97f172600f/charts/orchestrator/values.yaml#L68-L72 GetPodResourceRequirements() corev1.ResourceRequirements - // GetReplicaCountForService Returns the default pod replica count for the given service + // GetReplicaCount Returns the default pod replica count for the given service GetReplicaCount() int32 // MergeContainerSpec performs a merge with override using the containerSpec argument and the expected values based on the service's pod template specifications. The returning // object is the merged result MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) - //ConfigurePersistence sets the persistence's image and environment values when it is defined in the Persistence field of the service, overriding any existing value. + // ConfigurePersistence sets the persistence's image and environment values when it is defined in the Persistence field of the service, overriding any existing value. ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container - //MergePodSpec performs a merge with override between the podSpec argument and the expected values based on the service's pod template specification. The returning + // MergePodSpec performs a merge with override between the podSpec argument and the expected values based on the service's pod template specification. The returning // object is the result of the merge MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) // GenerateWorkflowProperties returns a property object that contains the service's application properties required by workflows @@ -69,19 +69,19 @@ type Platform interface { GenerateServiceProperties() (*properties.Properties, error) } -type DataIndex struct { +type DataIndexHandler struct { platform *operatorapi.SonataFlowPlatform } -func NewDataIndexService(platform *operatorapi.SonataFlowPlatform) Platform { - return DataIndex{platform: platform} +func NewDataIndexHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler { + return DataIndexHandler{platform: platform} } -func (d DataIndex) GetContainerName() string { +func (d DataIndexHandler) GetContainerName() string { return constants.DataIndexServiceName } -func (d DataIndex) GetServiceImageName(persistenceName string) string { +func (d DataIndexHandler) GetServiceImageName(persistenceName string) string { var tag = version.GetMajorMinor() var suffix = "" if version.IsSnapshot() { @@ -93,11 +93,11 @@ func (d DataIndex) GetServiceImageName(persistenceName string) string { return fmt.Sprintf("%s-%s-%s:%s", constants.ImageNamePrefix, constants.DataIndexName, persistenceName+suffix, tag) } -func (d DataIndex) GetServiceName() string { +func (d DataIndexHandler) GetServiceName() string { return fmt.Sprintf("%s-%s", d.platform.Name, constants.DataIndexServiceName) } -func (d DataIndex) GetEnvironmentVariables() []corev1.EnvVar { +func (d DataIndexHandler) GetEnvironmentVariables() []corev1.EnvVar { return []corev1.EnvVar{ { Name: "KOGITO_DATA_INDEX_QUARKUS_PROFILE", @@ -114,7 +114,7 @@ func (d DataIndex) GetEnvironmentVariables() []corev1.EnvVar { } } -func (d DataIndex) GetPodResourceRequirements() corev1.ResourceRequirements { +func (d DataIndexHandler) GetPodResourceRequirements() corev1.ResourceRequirements { return corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("100m"), @@ -123,14 +123,13 @@ func (d DataIndex) GetPodResourceRequirements() corev1.ResourceRequirements { } } -func (d DataIndex) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) { +func (d DataIndexHandler) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) { c := podSpec.DeepCopy() err := mergo.Merge(c, d.platform.Spec.Services.DataIndex.PodTemplate.PodSpec.ToPodSpec(), mergo.WithOverride) return *c, err } -func (d DataIndex) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { - +func (d DataIndexHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { if d.platform.Spec.Services.DataIndex.Persistence != nil && d.platform.Spec.Services.DataIndex.Persistence.PostgreSql != nil { c := containerSpec.DeepCopy() c.Image = d.GetServiceImageName(constants.PersistenceTypePostgreSQL) @@ -140,24 +139,24 @@ func (d DataIndex) ConfigurePersistence(containerSpec *corev1.Container) *corev1 return containerSpec } -func (d DataIndex) MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) { +func (d DataIndexHandler) MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) { c := containerSpec.DeepCopy() err := mergo.Merge(c, d.platform.Spec.Services.DataIndex.PodTemplate.Container.ToContainer(), mergo.WithOverride) return c, err } -func (d DataIndex) GetReplicaCount() int32 { +func (d DataIndexHandler) GetReplicaCount() int32 { if d.platform.Spec.Services.DataIndex.PodTemplate.Replicas != nil { return *d.platform.Spec.Services.DataIndex.PodTemplate.Replicas } return 1 } -func (d DataIndex) GetServiceCmName() string { +func (d DataIndexHandler) GetServiceCmName() string { return fmt.Sprintf("%s-props", d.GetServiceName()) } -func (d DataIndex) configurePostgreSqlEnv(postgresql *operatorapi.PersistencePostgreSql, databaseSchema, databaseNamespace string) []corev1.EnvVar { +func (d DataIndexHandler) configurePostgreSqlEnv(postgresql *operatorapi.PersistencePostgreSql, databaseSchema, databaseNamespace string) []corev1.EnvVar { dataSourcePort := constants.DefaultPostgreSQLPort databaseName := "sonataflow" dataSourceURL := postgresql.JdbcUrl @@ -225,33 +224,36 @@ func (d DataIndex) configurePostgreSqlEnv(postgresql *operatorapi.PersistencePos } } -func (d DataIndex) GenerateWorkflowProperties() (*properties.Properties, error) { +func (d DataIndexHandler) GenerateWorkflowProperties() (*properties.Properties, error) { props := properties.NewProperties() if d.platform.Spec.Services.DataIndex != nil { - props.Set(constants.DataIndexServiceURLProperty, fmt.Sprintf("%s://%s.%s/processes", constants.DataIndexServiceURLProtocol, d.GetServiceName(), d.platform.Namespace)) + dataIndexUrl := generateServiceURL(constants.KogitoProcessEventsProtocol, d.platform.Namespace, d.GetServiceName()) + props.Set(constants.KogitoProcessDefinitionsEventsURL, fmt.Sprintf("%s/definitions", dataIndexUrl)) + props.Set(constants.KogitoProcessInstancesEventsURL, fmt.Sprintf("%s/processes", dataIndexUrl)) } return props, nil } -func (d DataIndex) GenerateServiceProperties() (*properties.Properties, error) { +func (d DataIndexHandler) GenerateServiceProperties() (*properties.Properties, error) { props := properties.NewProperties() + props.Set(constants.KogitoServiceURLProperty, generateServiceURL(constants.KogitoServiceURLProtocol, d.platform.Namespace, d.GetServiceName())) props.Set(constants.DataIndexKafkaSmallRyeHealthProperty, "false") return props, nil } -type JobService struct { +type JobServiceHandler struct { platform *operatorapi.SonataFlowPlatform } -func NewJobService(platform *operatorapi.SonataFlowPlatform) Platform { - return JobService{platform: platform} +func NewJobServiceHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler { + return JobServiceHandler{platform: platform} } -func (j JobService) GetContainerName() string { +func (j JobServiceHandler) GetContainerName() string { return constants.JobServiceName } -func (j JobService) GetServiceImageName(persistenceName string) string { +func (j JobServiceHandler) GetServiceImageName(persistenceName string) string { var tag = version.GetMajorMinor() var suffix = "" if version.IsSnapshot() { @@ -263,15 +265,15 @@ func (j JobService) GetServiceImageName(persistenceName string) string { return fmt.Sprintf("%s-%s-%s:%s", constants.ImageNamePrefix, constants.JobServiceName, persistenceName+suffix, tag) } -func (j JobService) GetServiceName() string { +func (j JobServiceHandler) GetServiceName() string { return fmt.Sprintf("%s-%s", j.platform.Name, constants.JobServiceName) } -func (j JobService) GetServiceCmName() string { +func (j JobServiceHandler) GetServiceCmName() string { return fmt.Sprintf("%s-props", j.GetServiceName()) } -func (j JobService) GetEnvironmentVariables() []corev1.EnvVar { +func (j JobServiceHandler) GetEnvironmentVariables() []corev1.EnvVar { return []corev1.EnvVar{ { Name: "QUARKUS_HTTP_CORS", @@ -284,7 +286,7 @@ func (j JobService) GetEnvironmentVariables() []corev1.EnvVar { } } -func (j JobService) GetPodResourceRequirements() corev1.ResourceRequirements { +func (j JobServiceHandler) GetPodResourceRequirements() corev1.ResourceRequirements { return corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("250m"), @@ -297,17 +299,17 @@ func (j JobService) GetPodResourceRequirements() corev1.ResourceRequirements { } } -func (j JobService) GetReplicaCount() int32 { +func (j JobServiceHandler) GetReplicaCount() int32 { return 1 } -func (j JobService) MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) { +func (j JobServiceHandler) MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) { c := containerSpec.DeepCopy() err := mergo.Merge(c, j.platform.Spec.Services.JobService.PodTemplate.Container.ToContainer(), mergo.WithOverride) return c, err } -func (j JobService) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { +func (j JobServiceHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container { if j.platform.Spec.Services.JobService.Persistence != nil && j.platform.Spec.Services.JobService.Persistence.PostgreSql != nil { c := containerSpec.DeepCopy() @@ -318,13 +320,13 @@ func (j JobService) ConfigurePersistence(containerSpec *corev1.Container) *corev return containerSpec } -func (j JobService) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) { +func (j JobServiceHandler) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) { c := podSpec.DeepCopy() err := mergo.Merge(c, j.platform.Spec.Services.JobService.PodTemplate.PodSpec.ToPodSpec(), mergo.WithOverride) return *c, err } -func (j JobService) configurePostgreSqlEnv(postgresql *operatorapi.PersistencePostgreSql, databaseSchema, databaseNamespace string) []corev1.EnvVar { +func (j JobServiceHandler) configurePostgreSqlEnv(postgresql *operatorapi.PersistencePostgreSql, databaseSchema, databaseNamespace string) []corev1.EnvVar { dataSourcePort := constants.DefaultPostgreSQLPort databaseName := "sonataflow" dataSourceURL := postgresql.JdbcUrl @@ -389,8 +391,9 @@ func (j JobService) configurePostgreSqlEnv(postgresql *operatorapi.PersistencePo } } -func (j JobService) GenerateServiceProperties() (*properties.Properties, error) { +func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties, error) { props := properties.NewProperties() + props.Set(constants.KogitoServiceURLProperty, generateServiceURL(constants.KogitoServiceURLProtocol, j.platform.Namespace, j.GetServiceName())) props.Set(constants.JobServiceKafkaSmallRyeHealthProperty, "false") // add data source reactive URL jspec := j.platform.Spec.Services.JobService @@ -402,18 +405,17 @@ func (j JobService) GenerateServiceProperties() (*properties.Properties, error) props.Set(constants.JobServiceDataSourceReactiveURL, dataSourceReactiveURL) } if dataIndexEnabled(j.platform) { - di := NewDataIndexService(j.platform) + di := NewDataIndexHandler(j.platform) props.Set(constants.JobServiceStatusChangeEvents, "true") - props.Set(constants.JobServiceStatusChangeEventsURL, fmt.Sprintf("%s://%s.%s/jobs", constants.DataIndexServiceURLProtocol, di.GetServiceName(), j.platform.Namespace)) + props.Set(constants.JobServiceStatusChangeEventsURL, fmt.Sprintf("%s/jobs", generateServiceURL(constants.KogitoProcessEventsProtocol, j.platform.Namespace, di.GetServiceName()))) } props.Sort() return props, nil } -func (j JobService) GenerateWorkflowProperties() (*properties.Properties, error) { +func (j JobServiceHandler) GenerateWorkflowProperties() (*properties.Properties, error) { props := properties.NewProperties() - // add data source reactive URL - props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://%s.%s/v2/jobs/events", constants.JobServiceURLProtocol, j.GetServiceName(), j.platform.Namespace)) + props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s/v2/jobs/events", generateServiceURL(constants.KogitoProcessEventsProtocol, j.platform.Namespace, j.GetServiceName()))) return props, nil } @@ -425,3 +427,13 @@ func dataIndexEnabled(platform *operatorapi.SonataFlowPlatform) bool { func jobServiceEnabled(platform *operatorapi.SonataFlowPlatform) bool { return platform != nil && platform.Spec.Services.JobService != nil && platform.Spec.Services.JobService.Enabled != nil && *platform.Spec.Services.JobService.Enabled } + +func generateServiceURL(protocol string, namespace string, name string) string { + var serviceUrl string + if len(namespace) > 0 { + serviceUrl = fmt.Sprintf("%s://%s.%s", protocol, name, namespace) + } else { + serviceUrl = fmt.Sprintf("%s://%s", protocol, name) + } + return serviceUrl +} diff --git a/controllers/profiles/common/app_properties_test.go.removed b/controllers/profiles/common/app_properties_test.go.removed deleted file mode 100644 index 8e0bd0336..000000000 --- a/controllers/profiles/common/app_properties_test.go.removed +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package common - -import ( - "context" - "fmt" - "testing" - - "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery" - - "github.com/magiconair/properties" - - "github.com/stretchr/testify/assert" - - "github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata" - "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" - "github.com/apache/incubator-kie-kogito-serverless-operator/test" -) - -const ( - defaultNamespace = "default-namespace" - namespace1 = "namespace1" - myService1 = "my-service1" - myService1Address = "http://10.110.90.1:80" - myService2 = "my-service2" - myService2Address = "http://10.110.90.2:80" - myService3 = "my-service3" - myService3Address = "http://10.110.90.3:80" - - myKnService1 = "my-kn-service1" - myKnService1Address = "http://my-kn-sevice1.namespace1.svc.cluster.local" - - myKnService2 = "my-kn-service2" - myKnService2Address = "http://my-kn-sevice2.namespace1.svc.cluster.local" - - myKnService3 = "my-kn-service3" - myKnService3Address = "http://my-kn-sevice3.default-namespace.svc.cluster.local" - - myKnBroker1 = "my-kn-broker1" - myKnBroker1Address = "http://broker-ingress.knative-eventing.svc.cluster.local/namespace1/my-kn-broker1" - - myKnBroker2 = "my-kn-broker2" - myKnBroker2Address = "http://broker-ingress.knative-eventing.svc.cluster.local/default-namespace/my-kn-broker2" -) - -type mockCatalogService struct { -} - -func (c *mockCatalogService) Query(ctx context.Context, uri discovery.ResourceUri, outputFormat string) (string, error) { - if uri.Scheme == discovery.KubernetesScheme && uri.Namespace == namespace1 && uri.Name == myService1 { - return myService1Address, nil - } - if uri.Scheme == discovery.KubernetesScheme && uri.Name == myService2 && uri.Namespace == defaultNamespace { - return myService2Address, nil - } - if uri.Scheme == discovery.KubernetesScheme && uri.Name == myService3 && uri.Namespace == defaultNamespace && uri.GetPort() == "http-port" { - return myService3Address, nil - } - if uri.Scheme == discovery.KnativeScheme && uri.Name == myKnService1 && uri.Namespace == namespace1 { - return myKnService1Address, nil - } - if uri.Scheme == discovery.KnativeScheme && uri.Name == myKnService2 && uri.Namespace == namespace1 { - return myKnService2Address, nil - } - if uri.Scheme == discovery.KnativeScheme && uri.Name == myKnService3 && uri.Namespace == defaultNamespace { - return myKnService3Address, nil - } - if uri.Scheme == discovery.KnativeScheme && uri.Name == myKnBroker1 && uri.Namespace == namespace1 { - return myKnBroker1Address, nil - } - if uri.Scheme == discovery.KnativeScheme && uri.Name == myKnBroker2 && uri.Namespace == defaultNamespace { - return myKnBroker2Address, nil - } - - return "", nil -} - -func Test_appPropertyHandler_WithKogitoServiceUrl(t *testing.T) { - workflow := test.GetBaseSonataFlow("default") - props := ImmutableApplicationProperties(workflow, nil) - assert.Contains(t, props, kogitoServiceUrlProperty) - assert.Contains(t, props, "http://"+workflow.Name+"."+workflow.Namespace) -} - -func Test_appPropertyHandler_WithUserPropertiesWithNoUserOverrides(t *testing.T) { - //just add some user provided properties, no overrides. - userProperties := "property1=value1\nproperty2=value2" - workflow := test.GetBaseSonataFlow("default") - props := NewAppPropertyHandler(workflow, nil).WithUserProperties(userProperties).Build() - generatedProps, propsErr := properties.LoadString(props) - assert.NoError(t, propsErr) - assert.Equal(t, 8, len(generatedProps.Keys())) - assert.Equal(t, "value1", generatedProps.GetString("property1", "")) - assert.Equal(t, "value2", generatedProps.GetString("property2", "")) - assert.Equal(t, "http://greeting.default", generatedProps.GetString("kogito.service.url", "")) - assert.Equal(t, "8080", generatedProps.GetString("quarkus.http.port", "")) - assert.Equal(t, "0.0.0.0", generatedProps.GetString("quarkus.http.host", "")) - assert.Equal(t, "false", generatedProps.GetString("org.kie.kogito.addons.knative.eventing.health-enabled", "")) - assert.Equal(t, "false", generatedProps.GetString("quarkus.devservices.enabled", "")) - assert.Equal(t, "false", generatedProps.GetString("quarkus.kogito.devservices.enabled", "")) -} - -func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { - //try to override kogito.service.url and quarkus.http.port - userProperties := "property1=value1\nproperty2=value2\nquarkus.http.port=9090\nkogito.service.url=http://myUrl.override.com\nquarkus.http.port=9090" - ns := "default" - workflow := test.GetBaseSonataFlow(ns) - enabled := true - platform := test.GetBasePlatform() - platform.Namespace = ns - platform.Spec = v1alpha08.SonataFlowPlatformSpec{ - Services: v1alpha08.ServicesPlatformSpec{ - DataIndex: &v1alpha08.ServiceSpec{ - Enabled: &enabled, - }, - }, - } - - props := NewAppPropertyHandler(workflow, platform).WithUserProperties(userProperties).Build() - generatedProps, propsErr := properties.LoadString(props) - assert.NoError(t, propsErr) - assert.Equal(t, 8, len(generatedProps.Keys())) - assert.Equal(t, "value1", generatedProps.GetString("property1", "")) - assert.Equal(t, "value2", generatedProps.GetString("property2", "")) - //kogito.service.url takes the user provided value since it's a default mutable property. - assert.Equal(t, "http://myUrl.override.com", generatedProps.GetString("kogito.service.url", "")) - //quarkus.http.port remains with the default value since it's immutable. - assert.Equal(t, "8080", generatedProps.GetString("quarkus.http.port", "")) - assert.Equal(t, "0.0.0.0", generatedProps.GetString("quarkus.http.host", "")) - assert.Equal(t, "false", generatedProps.GetString("org.kie.kogito.addons.knative.eventing.health-enabled", "")) - assert.Equal(t, "false", generatedProps.GetString("quarkus.devservices.enabled", "")) - assert.Equal(t, "false", generatedProps.GetString("quarkus.kogito.devservices.enabled", "")) - assert.Equal(t, "", generatedProps.GetString(dataIndexServiceUrlProperty, "")) - - // prod profile enables config of outgoing events url - workflow.SetAnnotations(map[string]string{metadata.Profile: string(metadata.ProdProfile)}) - props = NewAppPropertyHandler(workflow, platform).WithUserProperties(userProperties).Build() - generatedProps, propsErr = properties.LoadString(props) - assert.NoError(t, propsErr) - assert.Equal(t, 9, len(generatedProps.Keys())) - assert.Equal(t, "http://"+platform.Name+"-"+DataIndexName+"."+platform.Namespace+"/processes", generatedProps.GetString(dataIndexServiceUrlProperty, "")) - - // disabling data index bypasses config of outgoing events url - platform.Spec.Services.DataIndex.Enabled = nil - props = NewAppPropertyHandler(workflow, platform).WithUserProperties(userProperties).Build() - generatedProps, propsErr = properties.LoadString(props) - assert.NoError(t, propsErr) - assert.Equal(t, 8, len(generatedProps.Keys())) - assert.Equal(t, "", generatedProps.GetString(dataIndexServiceUrlProperty, "")) - - // check that service app properties are being properly set - props = NewServiceAppPropertyHandler(platform).WithUserProperties(userProperties).Build() - generatedProps, propsErr = properties.LoadString(props) - assert.NoError(t, propsErr) - assert.Equal(t, 9, len(generatedProps.Keys())) - assert.Equal(t, "false", generatedProps.GetString(kafkaSmallRyeHealthProperty, "")) - assert.Equal(t, "value1", generatedProps.GetString("property1", "")) - assert.Equal(t, "value2", generatedProps.GetString("property2", "")) - //quarkus.http.port remains with the default value since it's immutable. - assert.Equal(t, "8080", generatedProps.GetString("quarkus.http.port", "")) -} - -func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T) { - //just add some user provided properties, no overrides. - userProperties := "property1=value1\nproperty2=value2\n" - //add some user properties that requires service discovery - userProperties = userProperties + "service1=${kubernetes:services.v1/namespace1/my-service1}\n" - userProperties = userProperties + "service2=${kubernetes:services.v1/my-service2}\n" - userProperties = userProperties + "service3=${knative:namespace1/my-kn-service1}\n" - userProperties = userProperties + "service4=${knative:services.v1.serving.knative.dev/namespace1/my-kn-service2}\n" - userProperties = userProperties + "service5=${knative:services.v1.serving.knative.dev/my-kn-service3}\n" - userProperties = userProperties + "broker1=${knative:brokers.v1.eventing.knative.dev/namespace1/my-kn-broker1}\n" - userProperties = userProperties + "broker2=${knative:brokers.v1.eventing.knative.dev/my-kn-broker2}\n" - - workflow := test.GetBaseSonataFlow(defaultNamespace) - props := NewAppPropertyHandler(workflow, nil). - WithUserProperties(userProperties). - WithServiceDiscovery(context.TODO(), &mockCatalogService{}). - Build() - generatedProps, propsErr := properties.LoadString(props) - generatedProps.DisableExpansion = true - assert.NoError(t, propsErr) - assert.Equal(t, 22, len(generatedProps.Keys())) - assertHasProperty(t, generatedProps, "property1", "value1") - assertHasProperty(t, generatedProps, "property2", "value2") - - assertHasProperty(t, generatedProps, "service1", "${kubernetes:services.v1/namespace1/my-service1}") - assertHasProperty(t, generatedProps, "service2", "${kubernetes:services.v1/my-service2}") - assertHasProperty(t, generatedProps, "service3", "${knative:namespace1/my-kn-service1}") - - //org.kie.kogito.addons.discovery.kubernetes\:services.v1\/usecase1ยบ/my-service1 below we use the unescaped vale because the properties.LoadString removes them. - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.kubernetes:services.v1/namespace1/my-service1", myService1Address) - //org.kie.kogito.addons.discovery.kubernetes\:services.v1\/my-service2 below we use the unescaped vale because the properties.LoadString removes them. - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.kubernetes:services.v1/my-service2", myService2Address) - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.knative:namespace1/my-kn-service1", myKnService1Address) - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.knative:services.v1.serving.knative.dev/namespace1/my-kn-service2", myKnService2Address) - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.knative:services.v1.serving.knative.dev/my-kn-service3", myKnService3Address) - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.knative:brokers.v1.eventing.knative.dev/namespace1/my-kn-broker1", myKnBroker1Address) - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.knative:brokers.v1.eventing.knative.dev/my-kn-broker2", myKnBroker2Address) - - assertHasProperty(t, generatedProps, "kogito.service.url", fmt.Sprintf("http://greeting.%s", defaultNamespace)) - assertHasProperty(t, generatedProps, "quarkus.http.port", "8080") - assertHasProperty(t, generatedProps, "quarkus.http.host", "0.0.0.0") - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.knative.eventing.health-enabled", "false") - assertHasProperty(t, generatedProps, "quarkus.devservices.enabled", "false") - assertHasProperty(t, generatedProps, "quarkus.kogito.devservices.enabled", "false") -} - -func assertHasProperty(t *testing.T, props *properties.Properties, expectedProperty string, expectedValue string) { - value, ok := props.Get(expectedProperty) - assert.True(t, ok, "Property %s, is not present as expected.", expectedProperty) - assert.Equal(t, expectedValue, value, "Expected value for property: %s, is: %s but current value is: %s", expectedProperty, expectedValue, value) -} - -func Test_generateMicroprofileServiceCatalogProperty(t *testing.T) { - - doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:services.v1/namespace1/financial-service", - "org.kie.kogito.addons.discovery.kubernetes\\:services.v1\\/namespace1\\/financial-service") - - doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:services.v1/financial-service", - "org.kie.kogito.addons.discovery.kubernetes\\:services.v1\\/financial-service") - - doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:pods.v1/namespace1/financial-service", - "org.kie.kogito.addons.discovery.kubernetes\\:pods.v1\\/namespace1\\/financial-service") - - doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:pods.v1/financial-service", - "org.kie.kogito.addons.discovery.kubernetes\\:pods.v1\\/financial-service") - - doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:deployments.v1.apps/namespace1/financial-service", - "org.kie.kogito.addons.discovery.kubernetes\\:deployments.v1.apps\\/namespace1\\/financial-service") - - doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:deployments.v1.apps/financial-service", - "org.kie.kogito.addons.discovery.kubernetes\\:deployments.v1.apps\\/financial-service") -} - -func doTestGenerateMicroprofileServiceCatalogProperty(t *testing.T, serviceUri string, expectedProperty string) { - mpProperty := generateMicroprofileServiceCatalogProperty(serviceUri) - assert.Equal(t, mpProperty, expectedProperty, "expected microprofile service catalog property for serviceUri: %s, is %s, but the returned value was: %s", serviceUri, expectedProperty, mpProperty) -} diff --git a/controllers/profiles/common/constants/platform_services.go b/controllers/profiles/common/constants/platform_services.go index 2f4ab5d86..99d295633 100644 --- a/controllers/profiles/common/constants/platform_services.go +++ b/controllers/profiles/common/constants/platform_services.go @@ -26,9 +26,6 @@ const ( PersistenceTypePostgreSQL = "postgresql" PersistenceTypeEphemeral = "ephemeral" - DataIndexServiceURLProperty = "mp.messaging.outgoing.kogito-processinstances-events.url" - DataIndexServiceURLProtocol = "http" - JobServiceRequestEventsURL = "mp.messaging.outgoing.kogito-job-service-job-request-events.url" JobServiceRequestEventsConnector = "mp.messaging.outgoing.kogito-job-service-job-request-events.connector" JobServiceStatusChangeEvents = "kogito.jobs-service.http.job-status-change-events" @@ -36,9 +33,12 @@ const ( JobServiceURLProtocol = "http" JobServiceDataSourceReactiveURL = "quarkus.datasource.reactive.url" - KogitoProcessInstancesEnabled = "kogito.events.processinstances.enabled" - KogitoProcessDefinitionsEnabled = "kogito.events.processdefinitions.enabled" - KogitoEventsUserTaskEnabled = "kogito.events.usertasks.enabled" + KogitoProcessEventsProtocol = "http" + KogitoProcessInstancesEventsURL = "mp.messaging.outgoing.kogito-processinstances-events.url" + KogitoProcessInstancesEventsEnabled = "kogito.events.processinstances.enabled" + KogitoProcessDefinitionsEventsURL = "mp.messaging.outgoing.kogito-processdefinitions-events.url" + KogitoProcessDefinitionsEventsEnabled = "kogito.events.processdefinitions.enabled" + KogitoUserTasksEventsEnabled = "kogito.events.usertasks.enabled" KogitoEventsVariablesEnabled = "kogito.events.variables.enabled" KogitoServiceURLProperty = "kogito.service.url" KogitoServiceURLProtocol = "http" diff --git a/controllers/profiles/common/properties/application.go b/controllers/profiles/common/properties/application.go index a89744c1e..fda4ec1e4 100644 --- a/controllers/profiles/common/properties/application.go +++ b/controllers/profiles/common/properties/application.go @@ -40,7 +40,7 @@ import ( var ( immutableApplicationProperties = fmt.Sprintf("quarkus.http.port=%d\n"+ "quarkus.http.host=0.0.0.0\n"+ - // We disable the Knative health checks to not block the dev pod to run if Knative objects are not available + // We disable the Knative health checks to not block the pod to run if Knative objects are not available // See: https://kiegroup.github.io/kogito-docs/serverlessworkflow/latest/eventing/consume-produce-events-with-knative-eventing.html#ref-knative-eventing-add-on-source-configuration "org.kie.kogito.addons.knative.eventing.health-enabled=false\n"+ "quarkus.devservices.enabled=false\n"+ @@ -149,9 +149,7 @@ func NewAppPropertyHandler(workflow *operatorapi.SonataFlow, platform *operatora platform: platform, } props := properties.NewProperties() - props.Set(constants.KogitoEventsUserTaskEnabled, "false") - props.Set(constants.KogitoEventsVariablesEnabled, "false") - props.Set(constants.KogitoProcessDefinitionsEnabled, "false") + props.Set(constants.KogitoUserTasksEventsEnabled, "false") if platform != nil { p, err := services.GenerateDataIndexWorkflowProperties(workflow, platform) if err != nil { diff --git a/controllers/profiles/common/properties/application_test.go b/controllers/profiles/common/properties/application_test.go index 4d3c0672f..a4c5dcf33 100644 --- a/controllers/profiles/common/properties/application_test.go +++ b/controllers/profiles/common/properties/application_test.go @@ -124,7 +124,7 @@ func Test_appPropertyHandler_WithUserPropertiesWithNoUserOverrides(t *testing.T) assert.NoError(t, err) generatedProps, propsErr := properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) - assert.Equal(t, 11, len(generatedProps.Keys())) + assert.Equal(t, 9, len(generatedProps.Keys())) assert.Equal(t, "value1", generatedProps.GetString("property1", "")) assert.Equal(t, "value2", generatedProps.GetString("property2", "")) assert.Equal(t, "http://greeting.default", generatedProps.GetString("kogito.service.url", "")) @@ -133,9 +133,7 @@ func Test_appPropertyHandler_WithUserPropertiesWithNoUserOverrides(t *testing.T) assert.Equal(t, "false", generatedProps.GetString("org.kie.kogito.addons.knative.eventing.health-enabled", "")) assert.Equal(t, "false", generatedProps.GetString("quarkus.devservices.enabled", "")) assert.Equal(t, "false", generatedProps.GetString("quarkus.kogito.devservices.enabled", "")) - assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEnabled, "")) - assert.Equal(t, "false", generatedProps.GetString(constants.KogitoEventsUserTaskEnabled, "")) - assert.Equal(t, "false", generatedProps.GetString(constants.KogitoEventsVariablesEnabled, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, "")) } func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T) { @@ -159,7 +157,7 @@ func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T Build()) generatedProps.DisableExpansion = true assert.NoError(t, propsErr) - assert.Equal(t, 25, len(generatedProps.Keys())) + assert.Equal(t, 23, len(generatedProps.Keys())) assertHasProperty(t, generatedProps, "property1", "value1") assertHasProperty(t, generatedProps, "property2", "value2") @@ -183,9 +181,7 @@ func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T assertHasProperty(t, generatedProps, "org.kie.kogito.addons.knative.eventing.health-enabled", "false") assertHasProperty(t, generatedProps, "quarkus.devservices.enabled", "false") assertHasProperty(t, generatedProps, "quarkus.kogito.devservices.enabled", "false") - assertHasProperty(t, generatedProps, constants.KogitoProcessDefinitionsEnabled, "false") - assertHasProperty(t, generatedProps, constants.KogitoEventsUserTaskEnabled, "false") - assertHasProperty(t, generatedProps, constants.KogitoEventsVariablesEnabled, "false") + assertHasProperty(t, generatedProps, constants.KogitoUserTasksEventsEnabled, "false") } func assertHasProperty(t *testing.T, props *properties.Properties, expectedProperty string, expectedValue string) { @@ -218,7 +214,7 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.NoError(t, err) generatedProps, propsErr := properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) - assert.Equal(t, 14, len(generatedProps.Keys())) + assert.Equal(t, 13, len(generatedProps.Keys())) assert.Equal(t, "value1", generatedProps.GetString("property1", "")) assert.Equal(t, "value2", generatedProps.GetString("property2", "")) @@ -230,11 +226,12 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.Equal(t, "false", generatedProps.GetString("org.kie.kogito.addons.knative.eventing.health-enabled", "")) assert.Equal(t, "false", generatedProps.GetString("quarkus.devservices.enabled", "")) assert.Equal(t, "false", generatedProps.GetString("quarkus.kogito.devservices.enabled", "")) - assert.Equal(t, "", generatedProps.GetString(constants.DataIndexServiceURLProperty, "")) assert.Equal(t, "http://localhost/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, "")) - assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEnabled, "")) - assert.Equal(t, "false", generatedProps.GetString(constants.KogitoEventsUserTaskEnabled, "")) - assert.Equal(t, "false", generatedProps.GetString(constants.KogitoEventsVariablesEnabled, "")) + assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, "")) + assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessInstancesEventsEnabled, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, "")) // prod profile enables config of outgoing events url workflow.SetAnnotations(map[string]string{metadata.Profile: string(metadata.ProdProfile)}) @@ -243,8 +240,12 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) assert.Equal(t, 15, len(generatedProps.Keys())) - assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/processes", generatedProps.GetString(constants.DataIndexServiceURLProperty, "")) + assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/definitions", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, "")) + assert.Equal(t, "true", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, "")) + assert.Equal(t, "http://"+platform.Name+"-"+constants.DataIndexServiceName+"."+platform.Namespace+"/processes", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, "")) + assert.Equal(t, "true", generatedProps.GetString(constants.KogitoProcessInstancesEventsEnabled, "")) assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceDataSourceReactiveURL, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) @@ -255,8 +256,12 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.NoError(t, err) generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) - assert.Equal(t, 14, len(generatedProps.Keys())) - assert.Equal(t, "", generatedProps.GetString(constants.DataIndexServiceURLProperty, "")) + assert.Equal(t, 13, len(generatedProps.Keys())) + assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, "")) + assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessInstancesEventsEnabled, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, "")) assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) @@ -267,53 +272,16 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.NoError(t, err) generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) - assert.Equal(t, 14, len(generatedProps.Keys())) - assert.Equal(t, "", generatedProps.GetString(constants.DataIndexServiceURLProperty, "")) + assert.Equal(t, 13, len(generatedProps.Keys())) + assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsURL, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessDefinitionsEventsEnabled, "")) + assert.Equal(t, "", generatedProps.GetString(constants.KogitoProcessInstancesEventsURL, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoProcessInstancesEventsEnabled, "")) + assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, "")) assert.Equal(t, "http://localhost/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceDataSourceReactiveURL, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) - - // check that the reactive URL is generated from the postgreSQL JDBC URL when not provided - platform.Spec.Services.JobService = &operatorapi.ServiceSpec{ - Enabled: &enabled, - Persistence: &operatorapi.PersistenceOptions{ - PostgreSql: &operatorapi.PersistencePostgreSql{ - ServiceRef: &operatorapi.PostgreSqlServiceOptions{ - Name: "jobs-service", - }, - }, - }, - } - props, err = NewAppPropertyHandler(workflow, platform) - assert.NoError(t, err) - generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build()) - assert.NoError(t, propsErr) - assert.Equal(t, 14, len(generatedProps.Keys())) - assert.Equal(t, "", generatedProps.GetString(constants.DataIndexServiceURLProperty, "")) - assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, "")) - assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, "")) - assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) - - // check that the reactive URL is generated from the postgreSQL JDBC URL when provided - platform.Spec.Services.JobService = &operatorapi.ServiceSpec{ - Enabled: &enabled, - Persistence: &operatorapi.PersistenceOptions{ - PostgreSql: &operatorapi.PersistencePostgreSql{ - JdbcUrl: "jdbc:postgresql://timeouts-showcase-database:5432/postgres?currentSchema=jobs-service", - }, - }, - } - props, err = NewAppPropertyHandler(workflow, platform) - assert.NoError(t, err) - generatedProps, propsErr = properties.LoadString(props.WithUserProperties(userProperties).Build()) - assert.NoError(t, propsErr) - assert.Equal(t, 14, len(generatedProps.Keys())) - assert.Equal(t, "", generatedProps.GetString(constants.DataIndexServiceURLProperty, "")) - assert.Equal(t, "http://sonataflow-platform-jobs-service.default/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, "")) - assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, "")) - assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) - } var _ = Describe("Platform properties", func() { @@ -477,18 +445,17 @@ var _ = Describe("Platform properties", func() { func generateJobServiceWorkflowDevProperties() *properties.Properties { if jobServiceDevProperties == nil { jobServiceDevProperties = properties.NewProperties() - jobServiceDevProperties.Set("kogito.events.processdefinitions.enabled", "false") - jobServiceDevProperties.Set("quarkus.devservices.enabled", "false") jobServiceDevProperties.Set("kogito.service.url", "http://foo.default") - jobServiceDevProperties.Set("quarkus.kogito.devservices.enabled", "false") jobServiceDevProperties.Set("quarkus.http.host", "0.0.0.0") jobServiceDevProperties.Set("quarkus.http.port", "8080") - jobServiceDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") + jobServiceDevProperties.Set("quarkus.devservices.enabled", "false") + jobServiceDevProperties.Set("quarkus.kogito.devservices.enabled", "false") jobServiceDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") + jobServiceDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") + jobServiceDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + jobServiceDevProperties.Set("kogito.events.processdefinitions.enabled", "false") jobServiceDevProperties.Set("kogito.events.processinstances.enabled", "false") jobServiceDevProperties.Set("kogito.events.usertasks.enabled", "false") - jobServiceDevProperties.Set("kogito.events.variables.enabled", "false") - jobServiceDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") jobServiceDevProperties.Sort() } return jobServiceDevProperties @@ -497,18 +464,17 @@ func generateJobServiceWorkflowDevProperties() *properties.Properties { func generateJobServiceWorkflowProductionProperties() *properties.Properties { if jobServiceProdProperties == nil { jobServiceProdProperties = properties.NewProperties() - jobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false") - jobServiceProdProperties.Set("kogito.events.processdefinitions.enabled", "false") - jobServiceProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + jobServiceProdProperties.Set("kogito.service.url", "http://foo.default") jobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0") jobServiceProdProperties.Set("quarkus.http.port", "8080") - jobServiceProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://foo-jobs-service.default/v2/jobs/events") - jobServiceProdProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") + jobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false") jobServiceProdProperties.Set("quarkus.devservices.enabled", "false") + jobServiceProdProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") + jobServiceProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + jobServiceProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://foo-jobs-service.default/v2/jobs/events") + jobServiceProdProperties.Set("kogito.events.processdefinitions.enabled", "false") jobServiceProdProperties.Set("kogito.events.processinstances.enabled", "false") jobServiceProdProperties.Set("kogito.events.usertasks.enabled", "false") - jobServiceProdProperties.Set("kogito.events.variables.enabled", "false") - jobServiceProdProperties.Set("kogito.service.url", "http://foo.default") jobServiceProdProperties.Sort() } return jobServiceProdProperties @@ -517,18 +483,18 @@ func generateJobServiceWorkflowProductionProperties() *properties.Properties { func generateDataIndexWorkflowDevProperties() *properties.Properties { if dataIndexDevProperties == nil { dataIndexDevProperties = properties.NewProperties() - dataIndexDevProperties.Set("kogito.events.variables.enabled", "false") - dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + dataIndexDevProperties.Set("kogito.service.url", "http://foo.default") + dataIndexDevProperties.Set("quarkus.http.host", "0.0.0.0") + dataIndexDevProperties.Set("quarkus.http.port", "8080") dataIndexDevProperties.Set("quarkus.devservices.enabled", "false") dataIndexDevProperties.Set("quarkus.kogito.devservices.enabled", "false") - dataIndexDevProperties.Set("kogito.service.url", "http://foo.default") dataIndexDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") - dataIndexDevProperties.Set("quarkus.http.port", "8080") + //TODO revisar, pero para el dev profile esto no va + dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") dataIndexDevProperties.Set("kogito.events.processdefinitions.enabled", "false") dataIndexDevProperties.Set("kogito.events.processinstances.enabled", "false") dataIndexDevProperties.Set("kogito.events.usertasks.enabled", "false") - dataIndexDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") - dataIndexDevProperties.Set("quarkus.http.host", "0.0.0.0") dataIndexDevProperties.Sort() } return dataIndexDevProperties @@ -537,19 +503,19 @@ func generateDataIndexWorkflowDevProperties() *properties.Properties { func generateDataIndexWorkflowProductionProperties() *properties.Properties { if dataIndexProdProperties == nil { dataIndexProdProperties = properties.NewProperties() - dataIndexProdProperties.Set("kogito.events.variables.enabled", "false") - dataIndexProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + dataIndexProdProperties.Set("kogito.service.url", "http://foo.default") + dataIndexProdProperties.Set("quarkus.http.host", "0.0.0.0") + dataIndexProdProperties.Set("quarkus.http.port", "8080") dataIndexProdProperties.Set("quarkus.devservices.enabled", "false") dataIndexProdProperties.Set("quarkus.kogito.devservices.enabled", "false") - dataIndexProdProperties.Set("kogito.service.url", "http://foo.default") dataIndexProdProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") + dataIndexProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + dataIndexProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") + dataIndexProdProperties.Set("mp.messaging.outgoing.kogito-processdefinitions-events.url", "http://foo-data-index-service.default/definitions") dataIndexProdProperties.Set("mp.messaging.outgoing.kogito-processinstances-events.url", "http://foo-data-index-service.default/processes") - dataIndexProdProperties.Set("quarkus.http.port", "8080") - dataIndexProdProperties.Set("kogito.events.processdefinitions.enabled", "false") + dataIndexProdProperties.Set("kogito.events.processdefinitions.enabled", "true") dataIndexProdProperties.Set("kogito.events.processinstances.enabled", "true") dataIndexProdProperties.Set("kogito.events.usertasks.enabled", "false") - dataIndexProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") - dataIndexProdProperties.Set("quarkus.http.host", "0.0.0.0") dataIndexProdProperties.Sort() } return dataIndexProdProperties @@ -558,18 +524,17 @@ func generateDataIndexWorkflowProductionProperties() *properties.Properties { func generateDataIndexAndJobServiceWorkflowDevProperties() *properties.Properties { if dataIndexJobServiceDevProperties == nil { dataIndexJobServiceDevProperties = properties.NewProperties() - dataIndexJobServiceDevProperties.Set("quarkus.kogito.devservices.enabled", "false") - dataIndexJobServiceDevProperties.Set("kogito.events.processdefinitions.enabled", "false") - dataIndexJobServiceDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + dataIndexJobServiceDevProperties.Set("kogito.service.url", "http://foo.default") dataIndexJobServiceDevProperties.Set("quarkus.http.host", "0.0.0.0") dataIndexJobServiceDevProperties.Set("quarkus.http.port", "8080") - dataIndexJobServiceDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") - dataIndexJobServiceDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") + dataIndexJobServiceDevProperties.Set("quarkus.kogito.devservices.enabled", "false") dataIndexJobServiceDevProperties.Set("quarkus.devservices.enabled", "false") + dataIndexJobServiceDevProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") + dataIndexJobServiceDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + dataIndexJobServiceDevProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://localhost/v2/jobs/events") + dataIndexJobServiceDevProperties.Set("kogito.events.processdefinitions.enabled", "false") dataIndexJobServiceDevProperties.Set("kogito.events.processinstances.enabled", "false") dataIndexJobServiceDevProperties.Set("kogito.events.usertasks.enabled", "false") - dataIndexJobServiceDevProperties.Set("kogito.events.variables.enabled", "false") - dataIndexJobServiceDevProperties.Set("kogito.service.url", "http://foo.default") dataIndexJobServiceDevProperties.Sort() } return dataIndexJobServiceDevProperties @@ -578,18 +543,18 @@ func generateDataIndexAndJobServiceWorkflowDevProperties() *properties.Propertie func generateDataIndexAndJobServiceWorkflowProductionProperties() *properties.Properties { if dataIndexJobServiceProdProperties == nil { dataIndexJobServiceProdProperties = properties.NewProperties() - dataIndexJobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false") - dataIndexJobServiceProdProperties.Set("kogito.events.processdefinitions.enabled", "false") - dataIndexJobServiceProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + dataIndexJobServiceProdProperties.Set("kogito.service.url", "http://foo.default") dataIndexJobServiceProdProperties.Set("quarkus.http.host", "0.0.0.0") dataIndexJobServiceProdProperties.Set("quarkus.http.port", "8080") - dataIndexJobServiceProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://foo-jobs-service.default/v2/jobs/events") - dataIndexJobServiceProdProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") + dataIndexJobServiceProdProperties.Set("quarkus.kogito.devservices.enabled", "false") dataIndexJobServiceProdProperties.Set("quarkus.devservices.enabled", "false") + dataIndexJobServiceProdProperties.Set("org.kie.kogito.addons.knative.eventing.health-enabled", "false") + dataIndexJobServiceProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.connector", "quarkus-http") + dataIndexJobServiceProdProperties.Set("mp.messaging.outgoing.kogito-job-service-job-request-events.url", "http://foo-jobs-service.default/v2/jobs/events") + dataIndexJobServiceProdProperties.Set("kogito.events.processdefinitions.enabled", "true") dataIndexJobServiceProdProperties.Set("kogito.events.processinstances.enabled", "true") dataIndexJobServiceProdProperties.Set("kogito.events.usertasks.enabled", "false") - dataIndexJobServiceProdProperties.Set("kogito.events.variables.enabled", "false") - dataIndexJobServiceProdProperties.Set("kogito.service.url", "http://foo.default") + dataIndexJobServiceProdProperties.Set("mp.messaging.outgoing.kogito-processdefinitions-events.url", "http://foo-data-index-service.default/definitions") dataIndexJobServiceProdProperties.Set("mp.messaging.outgoing.kogito-processinstances-events.url", "http://foo-data-index-service.default/processes") dataIndexJobServiceProdProperties.Sort() } diff --git a/controllers/sonataflowplatform_controller_test.go b/controllers/sonataflowplatform_controller_test.go index 901465e60..db0ba0805 100644 --- a/controllers/sonataflowplatform_controller_test.go +++ b/controllers/sonataflowplatform_controller_test.go @@ -127,7 +127,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Check data index deployment dep := &appsv1.Deployment{} - di := services.NewDataIndexService(ksp) + di := services.NewDataIndexHandler(ksp) assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: di.GetServiceName(), Namespace: ksp.Namespace}, dep)) env := corev1.EnvVar{ @@ -179,7 +179,7 @@ func TestSonataFlowPlatformController(t *testing.T) { }, } - di := services.NewDataIndexService(ksp) + di := services.NewDataIndexHandler(ksp) // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() @@ -295,7 +295,7 @@ func TestSonataFlowPlatformController(t *testing.T) { // Check data index deployment dep := &appsv1.Deployment{} - js := services.NewJobService(ksp) + js := services.NewJobServiceHandler(ksp) assert.NoError(t, cl.Get(context.TODO(), types.NamespacedName{Name: js.GetServiceName(), Namespace: ksp.Namespace}, dep)) assert.Len(t, dep.Spec.Template.Spec.Containers, 1) @@ -345,7 +345,7 @@ func TestSonataFlowPlatformController(t *testing.T) { }, } - js := services.NewJobService(ksp) + js := services.NewJobServiceHandler(ksp) // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() @@ -420,8 +420,8 @@ func TestSonataFlowPlatformController(t *testing.T) { JobService: &v1alpha08.ServiceSpec{}, } - di := services.NewDataIndexService(ksp) - js := services.NewJobService(ksp) + di := services.NewDataIndexHandler(ksp) + js := services.NewJobServiceHandler(ksp) // Create a fake client to mock API calls. cl := test.NewKogitoClientBuilderWithOpenShift().WithRuntimeObjects(ksp).WithStatusSubresource(ksp).Build() // Create a SonataFlowPlatformReconciler object with the scheme and fake client.