Skip to content

Commit

Permalink
kogito-serverless-operator-353: Enable the sending of the process def…
Browse files Browse the repository at this point in the history
…inition event when the data-index is present (apache#358)
  • Loading branch information
wmedvede authored and rgdoliveira committed Jan 29, 2024
1 parent 4d5a81d commit 14e27e5
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 458 deletions.
56 changes: 28 additions & 28 deletions controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,31 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S
}

if platform.Spec.Services.DataIndex != nil {
if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexService(platform)); err != nil {
if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexHandler(platform)); err != nil {
return nil, err
}
}

if platform.Spec.Services.JobService != nil {
if err := createServiceComponents(ctx, action.client, platform, services.NewJobService(platform)); err != nil {
if err := createServiceComponents(ctx, action.client, platform, services.NewJobServiceHandler(platform)); err != nil {
return nil, err
}
}

return platform, nil
}

func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
if err := createConfigMap(ctx, client, platform, ps); err != nil {
func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
if err := createConfigMap(ctx, client, platform, psh); err != nil {
return err
}
if err := createDeployment(ctx, client, platform, ps); err != nil {
if err := createDeployment(ctx, client, platform, psh); err != nil {
return err
}
return createService(ctx, client, platform, ps)
return createService(ctx, client, platform, psh)
}

func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
readyProbe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Expand All @@ -105,9 +105,9 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
liveProbe := readyProbe.DeepCopy()
liveProbe.ProbeHandler.HTTPGet.Path = common.QuarkusHealthPathLive
dataDeployContainer := &corev1.Container{
Image: ps.GetServiceImageName(constants.PersistenceTypeEphemeral),
Env: ps.GetEnvironmentVariables(),
Resources: ps.GetPodResourceRequirements(),
Image: psh.GetServiceImageName(constants.PersistenceTypeEphemeral),
Env: psh.GetEnvironmentVariables(),
Resources: psh.GetPodResourceRequirements(),
ReadinessProbe: readyProbe,
LivenessProbe: liveProbe,
Ports: []corev1.ContainerPort{
Expand All @@ -125,17 +125,17 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
},
},
}
dataDeployContainer = ps.ConfigurePersistence(dataDeployContainer)
dataDeployContainer, err := ps.MergeContainerSpec(dataDeployContainer)
dataDeployContainer = psh.ConfigurePersistence(dataDeployContainer)
dataDeployContainer, err := psh.MergeContainerSpec(dataDeployContainer)
if err != nil {
return err
}

// immutable
dataDeployContainer.Name = ps.GetContainerName()
dataDeployContainer.Name = psh.GetContainerName()

replicas := ps.GetReplicaCount()
lbl, selectorLbl := getLabels(platform, ps)
replicas := psh.GetReplicaCount()
lbl, selectorLbl := getLabels(platform, psh)
dataDeploySpec := appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: selectorLbl,
Expand All @@ -152,7 +152,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: ps.GetServiceCmName(),
Name: psh.GetServiceCmName(),
},
},
},
Expand All @@ -162,7 +162,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
},
}

dataDeploySpec.Template.Spec, err = ps.MergePodSpec(dataDeploySpec.Template.Spec)
dataDeploySpec.Template.Spec, err = psh.MergePodSpec(dataDeploySpec.Template.Spec)
if err != nil {
return err
}
Expand All @@ -171,7 +171,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
dataDeploy := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: platform.Namespace,
Name: ps.GetServiceName(),
Name: psh.GetServiceName(),
Labels: lbl,
}}
if err := controllerutil.SetControllerReference(platform, dataDeploy, client.Scheme()); err != nil {
Expand All @@ -192,8 +192,8 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
return nil
}

func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
lbl, selectorLbl := getLabels(platform, ps)
func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
lbl, selectorLbl := getLabels(platform, psh)
dataSvcSpec := corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Expand All @@ -208,7 +208,7 @@ func createService(ctx context.Context, client client.Client, platform *operator
dataSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: platform.Namespace,
Name: ps.GetServiceName(),
Name: psh.GetServiceName(),
Labels: lbl,
}}
if err := controllerutil.SetControllerReference(platform, dataSvc, client.Scheme()); err != nil {
Expand All @@ -229,26 +229,26 @@ func createService(ctx context.Context, client client.Client, platform *operator
return nil
}

func getLabels(platform *operatorapi.SonataFlowPlatform, ps services.Platform) (map[string]string, map[string]string) {
func getLabels(platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (map[string]string, map[string]string) {
lbl := map[string]string{
workflowproj.LabelApp: platform.Name,
workflowproj.LabelService: ps.GetServiceName(),
workflowproj.LabelService: psh.GetServiceName(),
}
selectorLbl := map[string]string{
workflowproj.LabelService: ps.GetServiceName(),
workflowproj.LabelService: psh.GetServiceName(),
}
return lbl, selectorLbl
}

func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
handler, err := services.NewServiceAppPropertyHandler(ps)
func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
handler, err := services.NewServiceAppPropertyHandler(psh)
if err != nil {
return err
}
lbl, _ := getLabels(platform, ps)
lbl, _ := getLabels(platform, psh)
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: ps.GetServiceCmName(),
Name: psh.GetServiceCmName(),
Namespace: platform.Namespace,
Labels: lbl,
},
Expand Down
18 changes: 10 additions & 8 deletions controllers/platform/services/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (

type serviceAppPropertyHandler struct {
userProperties string
platform Platform
serviceHandler PlatformServiceHandler
defaultMutableProperties *properties.Properties
}

Expand All @@ -59,9 +59,9 @@ type ServiceAppPropertyHandler interface {
// NewServiceAppPropertyHandler creates the default service configurations property handler
// The set of properties is initialized with the operator provided immutable properties.
// The set of defaultMutableProperties is initialized with the operator provided properties that the user might override.
func NewServiceAppPropertyHandler(ps Platform) (ServiceAppPropertyHandler, error) {
func NewServiceAppPropertyHandler(serviceHandler PlatformServiceHandler) (ServiceAppPropertyHandler, error) {
handler := &serviceAppPropertyHandler{}
props, err := ps.GenerateServiceProperties()
props, err := serviceHandler.GenerateServiceProperties()
if err != nil {
return nil, err
}
Expand All @@ -83,7 +83,7 @@ func (a *serviceAppPropertyHandler) Build() string {
props, propErr = properties.LoadString(a.userProperties)
}
if propErr != nil {
klog.V(log.D).InfoS("Can't load user's property", "service", a.platform.GetServiceName(), "properties", a.userProperties)
klog.V(log.D).InfoS("Can't load user's property", "service", a.serviceHandler.GetServiceName(), "properties", a.userProperties)
props = properties.NewProperties()
}
props = utils.NewApplicationPropertiesBuilder().
Expand Down Expand Up @@ -158,10 +158,12 @@ func generateReactiveURL(postgresSpec *operatorapi.PersistencePostgreSql, schema
// Never nil.
func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
props := properties.NewProperties()
props.Set(constants.KogitoProcessInstancesEnabled, "false")
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "false")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "false")
if workflow != nil && !profiles.IsDevProfile(workflow) && dataIndexEnabled(platform) {
props.Set(constants.KogitoProcessInstancesEnabled, "true")
di := NewDataIndexService(platform)
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "true")
di := NewDataIndexHandler(platform)
p, err := di.GenerateWorkflowProperties()
if err != nil {
return nil, err
Expand All @@ -182,7 +184,7 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat
props.Set(constants.JobServiceRequestEventsConnector, constants.QuarkusHTTP)
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol))
if workflow != nil && !profiles.IsDevProfile(workflow) && jobServiceEnabled(platform) {
js := NewJobService(platform)
js := NewJobServiceHandler(platform)
p, err := js.GenerateWorkflowProperties()
if err != nil {
return nil, err
Expand Down
18 changes: 11 additions & 7 deletions controllers/platform/services/properties_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
package services

import (
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"

"github.com/magiconair/properties"
)

Expand All @@ -33,23 +32,23 @@ var (
disabled = false
)

var _ = Describe("Platform properties", func() {
var _ = Describe("PlatformServiceHandler properties", func() {

var _ = Context("for service properties", func() {

var _ = Context("defining the application properties generated for the deployment of the", func() {

DescribeTable("Job Service",
func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) {
js := NewJobService(plfm)
js := NewJobServiceHandler(plfm)
handler, err := NewServiceAppPropertyHandler(js)
Expect(err).NotTo(HaveOccurred())
p, err := properties.LoadString(handler.Build())
Expect(err).NotTo(HaveOccurred())
p.Sort()
Expect(p).To(Equal(expectedProperties))
},
Entry("with an empty spec", generatePlatform(emptyJobServiceSpec()),
Entry("with an empty spec", generatePlatform(emptyJobServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default")),
generateJobServiceDeploymentDevProperties()),
Entry("with enabled field undefined and with ephemeral persistence",
generatePlatform(setJobServiceEnabledValue(nil), setPlatformName("foo"), setPlatformNamespace("default")),
Expand Down Expand Up @@ -78,15 +77,15 @@ var _ = Describe("Platform properties", func() {
)

DescribeTable("Data Index", func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) {
di := NewDataIndexService(plfm)
di := NewDataIndexHandler(plfm)
handler, err := NewServiceAppPropertyHandler(di)
Expect(err).NotTo(HaveOccurred())
p, err := properties.LoadString(handler.Build())
Expect(err).NotTo(HaveOccurred())
p.Sort()
Expect(p).To(Equal(expectedProperties))
},
Entry("with ephemeral persistence", generatePlatform(emptyDataIndexServiceSpec()), generateDataIndexDeploymentProperties()),
Entry("with ephemeral persistence", generatePlatform(emptyDataIndexServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default")), generateDataIndexDeploymentProperties()),
Entry("with postgreSQL persistence", generatePlatform(emptyDataIndexServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default"), setJobServiceJDBC("jdbc:postgresql://postgres:5432/sonataflow?currentSchema=myschema")),
generateDataIndexDeploymentProperties()),
)
Expand All @@ -98,6 +97,7 @@ var _ = Describe("Platform properties", func() {

func generateJobServiceDeploymentDevProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-jobs-service.default")
p.Set("quarkus.devservices.enabled", "false")
p.Set("quarkus.http.host", "0.0.0.0")
p.Set("quarkus.http.port", "8080")
Expand All @@ -109,6 +109,7 @@ func generateJobServiceDeploymentDevProperties() *properties.Properties {

func generateDataIndexDeploymentProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-data-index-service.default")
p.Set("quarkus.devservices.enabled", "false")
p.Set("quarkus.http.host", "0.0.0.0")
p.Set("quarkus.http.port", "8080")
Expand All @@ -120,6 +121,7 @@ func generateDataIndexDeploymentProperties() *properties.Properties {

func generateJobServiceDeploymentWithPostgreSQLProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-jobs-service.default")
p.Set("quarkus.devservices.enabled", "false")
p.Set("quarkus.http.host", "0.0.0.0")
p.Set("quarkus.http.port", "8080")
Expand All @@ -132,6 +134,7 @@ func generateJobServiceDeploymentWithPostgreSQLProperties() *properties.Properti

func generateJobServiceDeploymentWithDataIndexAndEphemeralProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-jobs-service.default")
p.Set("kogito.jobs-service.http.job-status-change-events", "true")
p.Set("mp.messaging.outgoing.kogito-job-service-job-status-events-http.url", "http://foo-data-index-service.default/jobs")
p.Set("quarkus.devservices.enabled", "false")
Expand All @@ -145,6 +148,7 @@ func generateJobServiceDeploymentWithDataIndexAndEphemeralProperties() *properti

func generateJobServiceDeploymentWithDataIndexAndPostgreSQLProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-jobs-service.default")
p.Set("kogito.jobs-service.http.job-status-change-events", "true")
p.Set("mp.messaging.outgoing.kogito-job-service-job-status-events-http.url", "http://foo-data-index-service.default/jobs")
p.Set("quarkus.devservices.enabled", "false")
Expand Down
Loading

0 comments on commit 14e27e5

Please sign in to comment.