Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kogito-serverless-operator-353: Enable the sending of the process definition event when the data-index is present #358

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,31 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S
}

if platform.Spec.Services.DataIndex != nil {
if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexService(platform)); err != nil {
if err := createServiceComponents(ctx, action.client, platform, services.NewDataIndexHandler(platform)); err != nil {
return nil, err
}
}

if platform.Spec.Services.JobService != nil {
if err := createServiceComponents(ctx, action.client, platform, services.NewJobService(platform)); err != nil {
if err := createServiceComponents(ctx, action.client, platform, services.NewJobServiceHandler(platform)); err != nil {
return nil, err
}
}

return platform, nil
}

func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
if err := createConfigMap(ctx, client, platform, ps); err != nil {
func createServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
if err := createConfigMap(ctx, client, platform, psh); err != nil {
return err
}
if err := createDeployment(ctx, client, platform, ps); err != nil {
if err := createDeployment(ctx, client, platform, psh); err != nil {
return err
}
return createService(ctx, client, platform, ps)
return createService(ctx, client, platform, psh)
}

func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
func createDeployment(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
readyProbe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Expand All @@ -105,9 +105,9 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
liveProbe := readyProbe.DeepCopy()
liveProbe.ProbeHandler.HTTPGet.Path = common.QuarkusHealthPathLive
dataDeployContainer := &corev1.Container{
Image: ps.GetServiceImageName(constants.PersistenceTypeEphemeral),
Env: ps.GetEnvironmentVariables(),
Resources: ps.GetPodResourceRequirements(),
Image: psh.GetServiceImageName(constants.PersistenceTypeEphemeral),
Env: psh.GetEnvironmentVariables(),
Resources: psh.GetPodResourceRequirements(),
ReadinessProbe: readyProbe,
LivenessProbe: liveProbe,
Ports: []corev1.ContainerPort{
Expand All @@ -125,17 +125,17 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
},
},
}
dataDeployContainer = ps.ConfigurePersistence(dataDeployContainer)
dataDeployContainer, err := ps.MergeContainerSpec(dataDeployContainer)
dataDeployContainer = psh.ConfigurePersistence(dataDeployContainer)
dataDeployContainer, err := psh.MergeContainerSpec(dataDeployContainer)
if err != nil {
return err
}

// immutable
dataDeployContainer.Name = ps.GetContainerName()
dataDeployContainer.Name = psh.GetContainerName()

replicas := ps.GetReplicaCount()
lbl, selectorLbl := getLabels(platform, ps)
replicas := psh.GetReplicaCount()
lbl, selectorLbl := getLabels(platform, psh)
dataDeploySpec := appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: selectorLbl,
Expand All @@ -152,7 +152,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: ps.GetServiceCmName(),
Name: psh.GetServiceCmName(),
},
},
},
Expand All @@ -162,7 +162,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
},
}

dataDeploySpec.Template.Spec, err = ps.MergePodSpec(dataDeploySpec.Template.Spec)
dataDeploySpec.Template.Spec, err = psh.MergePodSpec(dataDeploySpec.Template.Spec)
if err != nil {
return err
}
Expand All @@ -171,7 +171,7 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
dataDeploy := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: platform.Namespace,
Name: ps.GetServiceName(),
Name: psh.GetServiceName(),
Labels: lbl,
}}
if err := controllerutil.SetControllerReference(platform, dataDeploy, client.Scheme()); err != nil {
Expand All @@ -192,8 +192,8 @@ func createDeployment(ctx context.Context, client client.Client, platform *opera
return nil
}

func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
lbl, selectorLbl := getLabels(platform, ps)
func createService(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
lbl, selectorLbl := getLabels(platform, psh)
dataSvcSpec := corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Expand All @@ -208,7 +208,7 @@ func createService(ctx context.Context, client client.Client, platform *operator
dataSvc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: platform.Namespace,
Name: ps.GetServiceName(),
Name: psh.GetServiceName(),
Labels: lbl,
}}
if err := controllerutil.SetControllerReference(platform, dataSvc, client.Scheme()); err != nil {
Expand All @@ -229,26 +229,26 @@ func createService(ctx context.Context, client client.Client, platform *operator
return nil
}

func getLabels(platform *operatorapi.SonataFlowPlatform, ps services.Platform) (map[string]string, map[string]string) {
func getLabels(platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (map[string]string, map[string]string) {
lbl := map[string]string{
workflowproj.LabelApp: platform.Name,
workflowproj.LabelService: ps.GetServiceName(),
workflowproj.LabelService: psh.GetServiceName(),
}
selectorLbl := map[string]string{
workflowproj.LabelService: ps.GetServiceName(),
workflowproj.LabelService: psh.GetServiceName(),
}
return lbl, selectorLbl
}

func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, ps services.Platform) error {
handler, err := services.NewServiceAppPropertyHandler(ps)
func createConfigMap(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
handler, err := services.NewServiceAppPropertyHandler(psh)
if err != nil {
return err
}
lbl, _ := getLabels(platform, ps)
lbl, _ := getLabels(platform, psh)
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: ps.GetServiceCmName(),
Name: psh.GetServiceCmName(),
Namespace: platform.Namespace,
Labels: lbl,
},
Expand Down
18 changes: 10 additions & 8 deletions controllers/platform/services/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (

type serviceAppPropertyHandler struct {
userProperties string
platform Platform
serviceHandler PlatformServiceHandler
defaultMutableProperties *properties.Properties
}

Expand All @@ -59,9 +59,9 @@ type ServiceAppPropertyHandler interface {
// NewServiceAppPropertyHandler creates the default service configurations property handler
// The set of properties is initialized with the operator provided immutable properties.
// The set of defaultMutableProperties is initialized with the operator provided properties that the user might override.
func NewServiceAppPropertyHandler(ps Platform) (ServiceAppPropertyHandler, error) {
func NewServiceAppPropertyHandler(serviceHandler PlatformServiceHandler) (ServiceAppPropertyHandler, error) {
handler := &serviceAppPropertyHandler{}
props, err := ps.GenerateServiceProperties()
props, err := serviceHandler.GenerateServiceProperties()
if err != nil {
return nil, err
}
Expand All @@ -83,7 +83,7 @@ func (a *serviceAppPropertyHandler) Build() string {
props, propErr = properties.LoadString(a.userProperties)
}
if propErr != nil {
klog.V(log.D).InfoS("Can't load user's property", "service", a.platform.GetServiceName(), "properties", a.userProperties)
klog.V(log.D).InfoS("Can't load user's property", "service", a.serviceHandler.GetServiceName(), "properties", a.userProperties)
props = properties.NewProperties()
}
props = utils.NewApplicationPropertiesBuilder().
Expand Down Expand Up @@ -158,10 +158,12 @@ func generateReactiveURL(postgresSpec *operatorapi.PersistencePostgreSql, schema
// Never nil.
func GenerateDataIndexWorkflowProperties(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (*properties.Properties, error) {
props := properties.NewProperties()
props.Set(constants.KogitoProcessInstancesEnabled, "false")
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "false")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "false")
if workflow != nil && !profiles.IsDevProfile(workflow) && dataIndexEnabled(platform) {
props.Set(constants.KogitoProcessInstancesEnabled, "true")
di := NewDataIndexService(platform)
props.Set(constants.KogitoProcessDefinitionsEventsEnabled, "true")
props.Set(constants.KogitoProcessInstancesEventsEnabled, "true")
di := NewDataIndexHandler(platform)
p, err := di.GenerateWorkflowProperties()
if err != nil {
return nil, err
Expand All @@ -182,7 +184,7 @@ func GenerateJobServiceWorkflowProperties(workflow *operatorapi.SonataFlow, plat
props.Set(constants.JobServiceRequestEventsConnector, constants.QuarkusHTTP)
props.Set(constants.JobServiceRequestEventsURL, fmt.Sprintf("%s://localhost/v2/jobs/events", constants.JobServiceURLProtocol))
if workflow != nil && !profiles.IsDevProfile(workflow) && jobServiceEnabled(platform) {
js := NewJobService(platform)
js := NewJobServiceHandler(platform)
p, err := js.GenerateWorkflowProperties()
if err != nil {
return nil, err
Expand Down
18 changes: 11 additions & 7 deletions controllers/platform/services/properties_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
package services

import (
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"

"github.com/magiconair/properties"
)

Expand All @@ -33,23 +32,23 @@ var (
disabled = false
)

var _ = Describe("Platform properties", func() {
var _ = Describe("PlatformServiceHandler properties", func() {

var _ = Context("for service properties", func() {

var _ = Context("defining the application properties generated for the deployment of the", func() {

DescribeTable("Job Service",
func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) {
js := NewJobService(plfm)
js := NewJobServiceHandler(plfm)
handler, err := NewServiceAppPropertyHandler(js)
Expect(err).NotTo(HaveOccurred())
p, err := properties.LoadString(handler.Build())
Expect(err).NotTo(HaveOccurred())
p.Sort()
Expect(p).To(Equal(expectedProperties))
},
Entry("with an empty spec", generatePlatform(emptyJobServiceSpec()),
Entry("with an empty spec", generatePlatform(emptyJobServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default")),
generateJobServiceDeploymentDevProperties()),
Entry("with enabled field undefined and with ephemeral persistence",
generatePlatform(setJobServiceEnabledValue(nil), setPlatformName("foo"), setPlatformNamespace("default")),
Expand Down Expand Up @@ -78,15 +77,15 @@ var _ = Describe("Platform properties", func() {
)

DescribeTable("Data Index", func(plfm *operatorapi.SonataFlowPlatform, expectedProperties *properties.Properties) {
di := NewDataIndexService(plfm)
di := NewDataIndexHandler(plfm)
handler, err := NewServiceAppPropertyHandler(di)
Expect(err).NotTo(HaveOccurred())
p, err := properties.LoadString(handler.Build())
Expect(err).NotTo(HaveOccurred())
p.Sort()
Expect(p).To(Equal(expectedProperties))
},
Entry("with ephemeral persistence", generatePlatform(emptyDataIndexServiceSpec()), generateDataIndexDeploymentProperties()),
Entry("with ephemeral persistence", generatePlatform(emptyDataIndexServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default")), generateDataIndexDeploymentProperties()),
Entry("with postgreSQL persistence", generatePlatform(emptyDataIndexServiceSpec(), setPlatformName("foo"), setPlatformNamespace("default"), setJobServiceJDBC("jdbc:postgresql://postgres:5432/sonataflow?currentSchema=myschema")),
generateDataIndexDeploymentProperties()),
)
Expand All @@ -98,6 +97,7 @@ var _ = Describe("Platform properties", func() {

func generateJobServiceDeploymentDevProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-jobs-service.default")
p.Set("quarkus.devservices.enabled", "false")
p.Set("quarkus.http.host", "0.0.0.0")
p.Set("quarkus.http.port", "8080")
Expand All @@ -109,6 +109,7 @@ func generateJobServiceDeploymentDevProperties() *properties.Properties {

func generateDataIndexDeploymentProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-data-index-service.default")
p.Set("quarkus.devservices.enabled", "false")
p.Set("quarkus.http.host", "0.0.0.0")
p.Set("quarkus.http.port", "8080")
Expand All @@ -120,6 +121,7 @@ func generateDataIndexDeploymentProperties() *properties.Properties {

func generateJobServiceDeploymentWithPostgreSQLProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-jobs-service.default")
p.Set("quarkus.devservices.enabled", "false")
p.Set("quarkus.http.host", "0.0.0.0")
p.Set("quarkus.http.port", "8080")
Expand All @@ -132,6 +134,7 @@ func generateJobServiceDeploymentWithPostgreSQLProperties() *properties.Properti

func generateJobServiceDeploymentWithDataIndexAndEphemeralProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-jobs-service.default")
p.Set("kogito.jobs-service.http.job-status-change-events", "true")
p.Set("mp.messaging.outgoing.kogito-job-service-job-status-events-http.url", "http://foo-data-index-service.default/jobs")
p.Set("quarkus.devservices.enabled", "false")
Expand All @@ -145,6 +148,7 @@ func generateJobServiceDeploymentWithDataIndexAndEphemeralProperties() *properti

func generateJobServiceDeploymentWithDataIndexAndPostgreSQLProperties() *properties.Properties {
p := properties.NewProperties()
p.Set("kogito.service.url", "http://foo-jobs-service.default")
p.Set("kogito.jobs-service.http.job-status-change-events", "true")
p.Set("mp.messaging.outgoing.kogito-job-service-job-status-events-http.url", "http://foo-data-index-service.default/jobs")
p.Set("quarkus.devservices.enabled", "false")
Expand Down
Loading