Skip to content

Commit

Permalink
KOGITO-9867: Operator driven service discovery API
Browse files Browse the repository at this point in the history
    - Integrate the service discovery API
  • Loading branch information
wmedvede committed Nov 10, 2023
1 parent 1eb14df commit d3ed1e0
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 10 deletions.
4 changes: 2 additions & 2 deletions controllers/discovery/uri_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
"|" + kubernetesStatefulSets +
"|" + kubernetesIngresses + ")"

knativeGroupsPattern = "^(knative:|" + knativeServices + ")"
knativeGroupsPattern = "^(" + knativeServices + ")"

openshiftGroupsPattern = "^(" + openshiftDeploymentConfigs +
"|" + openshiftRoutes + ")"
Expand All @@ -52,7 +52,7 @@ func ParseUri(uri string) (*ResourceUri, error) {
return parseKubernetesUri(uri, kubernetesGroupsExpr.FindString(uri), split[1])
} else if split := knativeGroupsExpr.Split(uri, -1); len(split) == 2 {
return parseKnativeUri(knativeGroupsExpr.FindString(uri), split[1])
} else if split := openshiftGroupsExpr.Split(uri, -1); len(split[1]) == 2 {
} else if split := openshiftGroupsExpr.Split(uri, -1); len(split) == 2 {
return parseOpenshiftUri(openshiftGroupsExpr.FindString(uri), split[1])
}
return nil, fmt.Errorf("invalid uri: %s, not correspond to any of the available schemes format: %s, %s, %s", uri, KubernetesScheme, KnativeScheme, OpenshiftScheme)
Expand Down
101 changes: 101 additions & 0 deletions controllers/profiles/common/app_properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@
package common

import (
"context"
"fmt"

"regexp"
"strings"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"

"github.com/magiconair/properties"

"k8s.io/klog/v2"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
Expand All @@ -37,6 +44,9 @@ const (

PersistenceTypeEphemeral = "ephemeral"
PersistenceTypePostgressql = "postgresql"

microprofileServiceCatalogPropertyPrefix = "org.kie.kogito.addons.discovery."
discoveryLikePropertyPattern = "^\\${(kubernetes|knative|openshift):(.*)}$"
)

var immutableApplicationProperties = "quarkus.http.port=" + DefaultHTTPWorkflowPortIntStr.String() + "\n" +
Expand All @@ -47,16 +57,21 @@ var immutableApplicationProperties = "quarkus.http.port=" + DefaultHTTPWorkflowP
"quarkus.devservices.enabled=false\n" +
"quarkus.kogito.devservices.enabled=false\n"

var discoveryLikePropertyExpr = regexp.MustCompile(discoveryLikePropertyPattern)

var _ AppPropertyHandler = &appPropertyHandler{}

type AppPropertyHandler interface {
WithUserProperties(userProperties string) AppPropertyHandler
WithServiceDiscovery(ctx context.Context, catalog discovery.ServiceCatalog) AppPropertyHandler
Build() string
}

type appPropertyHandler struct {
workflow *operatorapi.SonataFlow
platform *operatorapi.SonataFlowPlatform
catalog discovery.ServiceCatalog
ctx context.Context
userProperties string
defaultMutableProperties string
}
Expand All @@ -66,6 +81,12 @@ func (a *appPropertyHandler) WithUserProperties(properties string) AppPropertyHa
return a
}

func (a *appPropertyHandler) WithServiceDiscovery(ctx context.Context, catalog discovery.ServiceCatalog) AppPropertyHandler {
a.ctx = ctx
a.catalog = catalog
return a
}

func (a *appPropertyHandler) Build() string {
var props *properties.Properties
var propErr error = nil
Expand All @@ -82,6 +103,16 @@ func (a *appPropertyHandler) Build() string {
// Disable expansions since it's not our responsibility
// Property expansion means resolving ${} within the properties and environment context. Quarkus will do that in runtime.
props.DisableExpansion = true

removeDiscoveryProperties(props)
if a.requireServiceDiscovery() {
// produce the MicroProfileConfigServiceCatalog properties for the service discovery property values if any.
discoveryProperties := generateDiscoveryProperties(a.ctx, a.catalog, props, a.workflow)
if discoveryProperties.Len() > 0 {
props.Merge(discoveryProperties)
}
}

defaultMutableProps := properties.MustLoadString(a.defaultMutableProperties)
for _, k := range defaultMutableProps.Keys() {
if _, ok := props.Get(k); ok {
Expand Down Expand Up @@ -152,3 +183,73 @@ func ImmutableApplicationProperties(workflow *operatorapi.SonataFlow, platform *
func GetDataIndexName(platform *operatorapi.SonataFlowPlatform) string {
return platform.Name + "-" + DataIndexName
}

func (a *appPropertyHandler) requireServiceDiscovery() bool {
return a.ctx != nil && a.catalog != nil
}

// generateDiscoveryProperties Given a user configured properties set, generates the MicroProfileConfigServiceCatalog
// required properties to resolve the corresponding service addresses base on these properties.
// e.g.
// Given a user configured property like this:
//
// quarkus.rest-client.acme_financial_service_yml.url=${kubernetes:services.v1/usecase1/financial-service?port=http-port}
//
// generates the following property:
//
// org.kie.kogito.addons.discovery.kubernetes\:services.v1\/usecase1\/financial-service?port\=http-port=http://10.5.9.1:8080
//
// where http://10.5.9.1:8080 is the corresponding k8s cloud address for the service financial-service in the namespace usecase1.
func generateDiscoveryProperties(ctx context.Context, catalog discovery.ServiceCatalog, props *properties.Properties,
workflow *operatorapi.SonataFlow) *properties.Properties {
klog.V(log.I).Infof("Generating service discovery properties for workflow: %s, and namespace: %s.", workflow.Name, workflow.Namespace)
result := properties.NewProperties()
props.DisableExpansion = true
for _, k := range props.Keys() {
value, _ := props.Get(k)
klog.V(log.I).Infof("Scanning property %s=%s for service discovery configuration.", k, value)
if !discoveryLikePropertyExpr.MatchString(value) {
klog.V(log.I).Infof("Skipping property %s=%s since it does not look like a service discovery configuration.", k, value)
} else {
klog.V(log.I).Infof("Property %s=%s looks like a service discovery configuration.", k, value)
plainUri := value[2 : len(value)-1]
if uri, err := discovery.ParseUri(plainUri); err != nil {
klog.V(log.I).Infof("Property %s=%s not correspond to a valid service discovery configuration, it will be excluded from service discovery.", k, value)
} else {
if len(uri.Namespace) == 0 {
klog.V(log.I).Infof("Current service discovery configuration has no configured namespace, workflow namespace: %s will be used instead.", workflow.Namespace)
uri.Namespace = workflow.Namespace
}
if address, err := catalog.Query(ctx, *uri, discovery.KubernetesDNSAddress); err != nil {
klog.V(log.E).ErrorS(err, "An error was produced during service address resolution.", "serviceUri", plainUri)
} else {
klog.V(log.I).Infof("Service: %s was resolved into the following address: %s.", plainUri, address)
mpProperty := generateMicroprofileServiceCatalogProperty(plainUri)
klog.V(log.I).Infof("Generating microprofile service catalog property %s=%s.", mpProperty, address)
result.MustSet(mpProperty, address)
}
}
}
}
return result
}

func removeDiscoveryProperties(props *properties.Properties) {
for _, k := range props.Keys() {
if strings.HasPrefix(k, microprofileServiceCatalogPropertyPrefix) {
props.Delete(k)
}
}
}

func generateMicroprofileServiceCatalogProperty(serviceUri string) string {
escapedServiceUri := escapeValue(serviceUri, ":")
escapedServiceUri = escapeValue(escapedServiceUri, "/")
escapedServiceUri = escapeValue(escapedServiceUri, "=")
property := microprofileServiceCatalogPropertyPrefix + escapedServiceUri
return property
}

func escapeValue(unescaped string, value string) string {
return strings.Replace(unescaped, value, fmt.Sprintf("\\%s", value), -1)
}
122 changes: 122 additions & 0 deletions controllers/profiles/common/app_properties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
package common

import (
"context"
"fmt"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"

"github.com/magiconair/properties"

"github.com/stretchr/testify/assert"
Expand All @@ -26,6 +32,33 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
)

const (
defaultNamespace = "default-namespace"
namespace1 = "namespace1"
myService1 = "my-service1"
myService1Address = "http://10.110.90.1:80"
myService2 = "my-service2"
myService2Address = "http://10.110.90.2:80"
myService3 = "my-service3"
myService3Address = "http://10.110.90.3:80"
)

type mockCatalogService struct {
}

func (c *mockCatalogService) Query(ctx context.Context, uri discovery.ResourceUri, outputFormat string) (string, error) {
if uri.Scheme == discovery.KubernetesScheme && uri.Namespace == namespace1 && uri.Name == myService1 {
return myService1Address, nil
}
if uri.Scheme == discovery.KubernetesScheme && uri.Name == myService2 && uri.Namespace == defaultNamespace {
return myService2Address, nil
}
if uri.Scheme == discovery.KubernetesScheme && uri.Name == myService3 && uri.Namespace == defaultNamespace && uri.GetPort() == "http-port" {
return myService3Address, nil
}
return "", nil
}

func Test_appPropertyHandler_WithKogitoServiceUrl(t *testing.T) {
workflow := test.GetBaseSonataFlow("default")
props := ImmutableApplicationProperties(workflow, nil)
Expand Down Expand Up @@ -99,3 +132,92 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) {
assert.Equal(t, 8, len(generatedProps.Keys()))
assert.Equal(t, "", generatedProps.GetString(dataIndexServiceUrlProperty, ""))
}

func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T) {
//just add some user provided properties, no overrides.
userProperties := "property1=value1\nproperty2=value2\n"
//add some user properties that requires service discovery
userProperties = userProperties + "service1=${kubernetes:services.v1/namespace1/my-service1}\n"
userProperties = userProperties + "service2=${kubernetes:services.v1/my-service2}\n"

workflow := test.GetBaseSonataFlow(defaultNamespace)
props := NewAppPropertyHandler(workflow, nil).
WithUserProperties(userProperties).
WithServiceDiscovery(context.TODO(), &mockCatalogService{}).
Build()
generatedProps, propsErr := properties.LoadString(props)
generatedProps.DisableExpansion = true
assert.NoError(t, propsErr)
assert.Equal(t, 12, len(generatedProps.Keys()))
assertHasProperty(t, generatedProps, "property1", "value1")
assertHasProperty(t, generatedProps, "property2", "value2")

assertHasProperty(t, generatedProps, "service1", "${kubernetes:services.v1/namespace1/my-service1}")
assertHasProperty(t, generatedProps, "service2", "${kubernetes:services.v1/my-service2}")
//org.kie.kogito.addons.discovery.kubernetes\:services.v1\/usecase1º/my-service1 below we use the unescaped vale because the properties.LoadString removes them.
assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.kubernetes:services.v1/namespace1/my-service1", myService1Address)
//org.kie.kogito.addons.discovery.kubernetes\:services.v1\/my-service2 below we use the unescaped vale because the properties.LoadString removes them.
assertHasProperty(t, generatedProps, "org.kie.kogito.addons.discovery.kubernetes:services.v1/my-service2", myService2Address)

assertHasProperty(t, generatedProps, "kogito.service.url", fmt.Sprintf("http://greeting.%s", defaultNamespace))
assertHasProperty(t, generatedProps, "quarkus.http.port", "8080")
assertHasProperty(t, generatedProps, "quarkus.http.host", "0.0.0.0")
assertHasProperty(t, generatedProps, "org.kie.kogito.addons.knative.eventing.health-enabled", "false")
assertHasProperty(t, generatedProps, "quarkus.devservices.enabled", "false")
assertHasProperty(t, generatedProps, "quarkus.kogito.devservices.enabled", "false")
}

func Test_generateDiscoveryProperties(t *testing.T) {

catalogService := &mockCatalogService{}

propertiesContent := "property1=value1\n"
propertiesContent = propertiesContent + "property2=${value2}\n"
propertiesContent = propertiesContent + "service1=${kubernetes:services.v1/namespace1/my-service1}\n"
propertiesContent = propertiesContent + "service2=${kubernetes:services.v1/my-service2}\n"
propertiesContent = propertiesContent + "service3=${kubernetes:services.v1/my-service3?port=http-port}\n"

propertiesContent = propertiesContent + "non_service4=${kubernetes:--kaka}"

props := properties.MustLoadString(propertiesContent)
result := generateDiscoveryProperties(context.TODO(), catalogService, props, &operatorapi.SonataFlow{
ObjectMeta: metav1.ObjectMeta{Name: "helloworld", Namespace: defaultNamespace},
})

assert.Equal(t, result.Len(), 3)
assertHasProperty(t, result, "org.kie.kogito.addons.discovery.kubernetes\\:services.v1\\/namespace1\\/my-service1", myService1Address)
assertHasProperty(t, result, "org.kie.kogito.addons.discovery.kubernetes\\:services.v1\\/my-service2", myService2Address)
assertHasProperty(t, result, "org.kie.kogito.addons.discovery.kubernetes\\:services.v1\\/my-service3?port\\=http-port", myService3Address)
}

func assertHasProperty(t *testing.T, props *properties.Properties, expectedProperty string, expectedValue string) {
value, ok := props.Get(expectedProperty)
assert.True(t, ok, "Property %s, is not present as expected.", expectedProperty)
assert.Equal(t, expectedValue, value, "Expected value for property: %s, is: %s but current value is: %s", expectedProperty, expectedValue, value)
}

func Test_generateMicroprofileServiceCatalogProperty(t *testing.T) {

doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:services.v1/namespace1/financial-service",
"org.kie.kogito.addons.discovery.kubernetes\\:services.v1\\/namespace1\\/financial-service")

doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:services.v1/financial-service",
"org.kie.kogito.addons.discovery.kubernetes\\:services.v1\\/financial-service")

doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:pods.v1/namespace1/financial-service",
"org.kie.kogito.addons.discovery.kubernetes\\:pods.v1\\/namespace1\\/financial-service")

doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:pods.v1/financial-service",
"org.kie.kogito.addons.discovery.kubernetes\\:pods.v1\\/financial-service")

doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:deployments.v1.apps/namespace1/financial-service",
"org.kie.kogito.addons.discovery.kubernetes\\:deployments.v1.apps\\/namespace1\\/financial-service")

doTestGenerateMicroprofileServiceCatalogProperty(t, "kubernetes:deployments.v1.apps/financial-service",
"org.kie.kogito.addons.discovery.kubernetes\\:deployments.v1.apps\\/financial-service")
}

func doTestGenerateMicroprofileServiceCatalogProperty(t *testing.T, serviceUri string, expectedProperty string) {
mpProperty := generateMicroprofileServiceCatalogProperty(serviceUri)
assert.Equal(t, mpProperty, expectedProperty, "expected microprofile service catalog property for serviceUri: %s, is %s, but the returned value was: %s", serviceUri, expectedProperty, mpProperty)
}
7 changes: 6 additions & 1 deletion controllers/profiles/common/mutate_visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package common

import (
"context"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"
"github.com/imdario/mergo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -94,7 +97,8 @@ func ServiceMutateVisitor(workflow *operatorapi.SonataFlow) MutateVisitor {
}
}

func WorkflowPropertiesMutateVisitor(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) MutateVisitor {
func WorkflowPropertiesMutateVisitor(ctx context.Context, catalog discovery.ServiceCatalog,
workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) MutateVisitor {
return func(object client.Object) controllerutil.MutateFn {
return func() error {
if kubeutil.IsObjectNew(object) {
Expand All @@ -113,6 +117,7 @@ func WorkflowPropertiesMutateVisitor(workflow *operatorapi.SonataFlow, platform
cm.Data[workflowproj.ApplicationPropertiesFileName] =
NewAppPropertyHandler(workflow, platform).
WithUserProperties(cm.Data[workflowproj.ApplicationPropertiesFileName]).
WithServiceDiscovery(ctx, catalog).
Build()

return nil
Expand Down
4 changes: 2 additions & 2 deletions controllers/profiles/common/object_creators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Test_ensureWorkflowPropertiesConfigMapMutator(t *testing.T) {
cm.SetResourceVersion("1")
reflectCm := cm.(*corev1.ConfigMap)

visitor := WorkflowPropertiesMutateVisitor(workflow, nil)
visitor := WorkflowPropertiesMutateVisitor(nil, nil, workflow, nil)
mutateFn := visitor(cm)

assert.NoError(t, mutateFn())
Expand Down Expand Up @@ -72,7 +72,7 @@ func Test_ensureWorkflowPropertiesConfigMapMutator_DollarReplacement(t *testing.
workflowproj.ApplicationPropertiesFileName: "mp.messaging.outgoing.kogito_outgoing_stream.url=${kubernetes:services.v1/event-listener}",
},
}
mutateVisitorFn := WorkflowPropertiesMutateVisitor(workflow, nil)
mutateVisitorFn := WorkflowPropertiesMutateVisitor(nil, nil, workflow, nil)

err := mutateVisitorFn(existingCM)()
assert.NoError(t, err)
Expand Down
5 changes: 4 additions & 1 deletion controllers/profiles/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery"

"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -29,7 +31,8 @@ import (

// StateSupport is the shared structure with common accessors used throughout the whole reconciliation profiles
type StateSupport struct {
C client.Client
C client.Client
Catalog discovery.ServiceCatalog
}

// PerformStatusUpdate updates the SonataFlow Status conditions
Expand Down
2 changes: 1 addition & 1 deletion controllers/profiles/dev/states_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora
if err == nil && len(pl.Spec.DevMode.BaseImage) > 0 {
devBaseContainerImage = pl.Spec.DevMode.BaseImage
}
propsCM, _, err := e.ensurers.propertiesConfigMap.Ensure(ctx, workflow, common.WorkflowPropertiesMutateVisitor(workflow, pl))
propsCM, _, err := e.ensurers.propertiesConfigMap.Ensure(ctx, workflow, common.WorkflowPropertiesMutateVisitor(ctx, e.StateSupport.Catalog, workflow, pl))
if err != nil {
return ctrl.Result{Requeue: false}, objs, err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/profiles/prod/deployment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (d *deploymentHandler) handle(ctx context.Context, workflow *operatorapi.So

func (d *deploymentHandler) handleWithImage(ctx context.Context, workflow *operatorapi.SonataFlow, image string) (reconcile.Result, []client.Object, error) {
pl, _ := platform.GetActivePlatform(ctx, d.C, workflow.Namespace)
propsCM, _, err := d.ensurers.propertiesConfigMap.Ensure(ctx, workflow, common.WorkflowPropertiesMutateVisitor(workflow, pl))
propsCM, _, err := d.ensurers.propertiesConfigMap.Ensure(ctx, workflow, common.WorkflowPropertiesMutateVisitor(ctx, d.StateSupport.Catalog, workflow, pl))
if err != nil {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.ExternalResourcesNotFoundReason, "Unable to retrieve the properties config map")
_, err = d.PerformStatusUpdate(ctx, workflow)
Expand Down
Loading

0 comments on commit d3ed1e0

Please sign in to comment.