Skip to content

Commit

Permalink
Break the current operator's configuration into custom and managed pr…
Browse files Browse the repository at this point in the history
…operties (apache#367)

* Initial commit

* integrating comments: introducing ObjectEnsurerWithPlatform and ObjectCreatorWithPlatform

* Removing fewe more unneeded references to platform

* reviewed workflowProjectHandler. removed user props from managed props

* fixed unit tests

* workarond for failed discovery options

* fixed broken unit tests

* fixed mutator for user props

* reviewed hashing function

* Anticipating deactivation of broken e2e test

* integrating comments: removing unneeded comment

* adding discovered value to properties whose value mathes the service discovery pattern

* removed unused package common_test

* Renamed managed props visitor and reviewed description of NewAppPropertyHandler
  • Loading branch information
dmartinol authored and rgdoliveira committed Feb 5, 2024
1 parent f40250e commit 2964c1f
Show file tree
Hide file tree
Showing 21 changed files with 312 additions and 168 deletions.
39 changes: 39 additions & 0 deletions controllers/profiles/common/ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ var _ ObjectEnsurer = &noopObjectEnsurer{}
type ObjectEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
}
type ObjectEnsurerWithPlatform interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
}

// MutateVisitor is a visitor function that mutates the given object before performing any updates in the cluster.
// It gets called after the objectEnforcer reference.
Expand All @@ -56,6 +59,14 @@ func NewObjectEnsurer(client client.Client, creator ObjectCreator) ObjectEnsurer
}
}

// NewObjectEnsurerWithPlatform see defaultObjectEnsurerWithPLatform
func NewObjectEnsurerWithPlatform(client client.Client, creator ObjectCreatorWithPlatform) ObjectEnsurerWithPlatform {
return &defaultObjectEnsurerWithPlatform{
c: client,
creator: creator,
}
}

// defaultObjectEnsurer provides the engine for a ReconciliationState that needs to create or update a given Kubernetes object during the reconciliation cycle.
type defaultObjectEnsurer struct {
c client.Client
Expand Down Expand Up @@ -84,6 +95,34 @@ func (d *defaultObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi
return object, result, nil
}

// defaultObjectEnsurerWithPlatform is the equivalent of defaultObjectEnsurer for resources that require a reference to the SonataFlowPlatform
type defaultObjectEnsurerWithPlatform struct {
c client.Client
creator ObjectCreatorWithPlatform
}

func (d *defaultObjectEnsurerWithPlatform) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error) {
result := controllerutil.OperationResultNone

object, err := d.creator(workflow, pl)
if err != nil {
return nil, result, err
}
if result, err = controllerutil.CreateOrPatch(ctx, d.c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, d.c.Scheme())
}); err != nil {
return nil, result, err
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
}

// NewNoopObjectEnsurer see noopObjectEnsurer
func NewNoopObjectEnsurer() ObjectEnsurer {
return &noopObjectEnsurer{}
Expand Down
32 changes: 14 additions & 18 deletions controllers/profiles/common/mutate_visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,32 +105,28 @@ func ServiceMutateVisitor(workflow *operatorapi.SonataFlow) MutateVisitor {
}
}

func WorkflowPropertiesMutateVisitor(ctx context.Context, catalog discovery.ServiceCatalog,
workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) MutateVisitor {
func ManagedPropertiesMutateVisitor(ctx context.Context, catalog discovery.ServiceCatalog,
workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform, userProps *corev1.ConfigMap) MutateVisitor {
return func(object client.Object) controllerutil.MutateFn {
return func() error {
if kubeutil.IsObjectNew(object) {
return nil
}
cm := object.(*corev1.ConfigMap)
cm.Labels = workflow.GetLabels()
_, hasKey := cm.Data[workflowproj.ApplicationPropertiesFileName]
managedProps := object.(*corev1.ConfigMap)
managedProps.Labels = workflow.GetLabels()
_, hasKey := managedProps.Data[workflowproj.GetManagedPropertiesFileName(workflow)]
if !hasKey {
cm.Data = make(map[string]string, 1)
props, err := properties.ImmutableApplicationProperties(workflow, platform)
if err != nil {
return err
}
cm.Data[workflowproj.ApplicationPropertiesFileName] = props
return nil
managedProps.Data = make(map[string]string, 1)
managedProps.Data[workflowproj.GetManagedPropertiesFileName(workflow)] = ""
}

userProperties, hasKey := userProps.Data[workflowproj.ApplicationPropertiesFileName]
if !hasKey {
userProperties = ""
}
// In the future, if this needs change, instead we can receive an AppPropertyHandler in this mutator
props, err := properties.NewAppPropertyHandler(workflow, platform)
if err != nil {
return err
}
cm.Data[workflowproj.ApplicationPropertiesFileName] = props.WithUserProperties(cm.Data[workflowproj.ApplicationPropertiesFileName]).
managedProps.Data[workflowproj.GetManagedPropertiesFileName(workflow)] = props.WithUserProperties(userProperties).
WithServiceDiscovery(ctx, catalog).
Build()
return nil
Expand All @@ -142,11 +138,11 @@ func WorkflowPropertiesMutateVisitor(ctx context.Context, catalog discovery.Serv
// This method can be used as an alternative to the Kubernetes ConfigMap refresher.
//
// See: https://kubernetes.io/docs/concepts/configuration/configmap/#mounted-configmaps-are-updated-automatically
func RolloutDeploymentIfCMChangedMutateVisitor(cm *v1.ConfigMap) MutateVisitor {
func RolloutDeploymentIfCMChangedMutateVisitor(workflow *operatorapi.SonataFlow, userPropsCM *v1.ConfigMap, managedPropsCM *v1.ConfigMap) MutateVisitor {
return func(object client.Object) controllerutil.MutateFn {
return func() error {
deployment := object.(*appsv1.Deployment)
err := kubeutil.AnnotateDeploymentConfigChecksum(deployment, cm)
err := kubeutil.AnnotateDeploymentConfigChecksum(workflow, deployment, userPropsCM, managedPropsCM)
return err
}
}
Expand Down
17 changes: 13 additions & 4 deletions controllers/profiles/common/object_creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ import (
// Can be used as a reference to keep the object immutable
type ObjectCreator func(workflow *operatorapi.SonataFlow) (client.Object, error)

// ObjectCreatorWithPlatform is the func equivalent to ObjectCreator to use when the resource being created needs a reference to the
// SonataFlowPlatform
type ObjectCreatorWithPlatform func(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error)

const (
defaultHTTPServicePort = 80

Expand Down Expand Up @@ -209,13 +213,18 @@ func OpenShiftRouteCreator(workflow *operatorapi.SonataFlow) (client.Object, err
return route, err
}

// WorkflowPropsConfigMapCreator creates a ConfigMap to hold the external application properties
func WorkflowPropsConfigMapCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
props, err := properties.ImmutableApplicationProperties(workflow, nil)
// UserPropsConfigMapCreator creates an empty ConfigMap to hold the user application properties
func UserPropsConfigMapCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
return workflowproj.CreateNewUserPropsConfigMap(workflow), nil
}

// ManagedPropsConfigMapCreator creates an empty ConfigMap to hold the external application properties
func ManagedPropsConfigMapCreator(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error) {
props, err := properties.ImmutableApplicationProperties(workflow, platform)
if err != nil {
return nil, err
}
return workflowproj.CreateNewAppPropsConfigMap(workflow, props), nil
return workflowproj.CreateNewManagedPropsConfigMap(workflow, props), nil
}

func ConfigurePersistence(serviceContainer *corev1.Container, options *operatorapi.PersistenceOptions, defaultSchema, namespace string) *corev1.Container {
Expand Down
57 changes: 31 additions & 26 deletions controllers/profiles/common/object_creators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
Expand All @@ -39,50 +38,56 @@ import (

func Test_ensureWorkflowPropertiesConfigMapMutator(t *testing.T) {
workflow := test.GetBaseSonataFlowWithDevProfile(t.Name())
platform := test.GetBasePlatform()
// can't be new
cm, _ := WorkflowPropsConfigMapCreator(workflow)
cm.SetUID("1")
cm.SetResourceVersion("1")
reflectCm := cm.(*corev1.ConfigMap)
managedProps, _ := ManagedPropsConfigMapCreator(workflow, platform)
managedProps.SetUID("1")
managedProps.SetResourceVersion("1")
managedPropsCM := managedProps.(*corev1.ConfigMap)

visitor := WorkflowPropertiesMutateVisitor(context.TODO(), nil, workflow, nil)
mutateFn := visitor(cm)
userProps, _ := UserPropsConfigMapCreator(workflow)
userPropsCM := userProps.(*corev1.ConfigMap)
visitor := ManagedPropertiesMutateVisitor(context.TODO(), nil, workflow, nil, userPropsCM)
mutateFn := visitor(managedProps)

assert.NoError(t, mutateFn())
assert.NotEmpty(t, reflectCm.Data[workflowproj.ApplicationPropertiesFileName])
assert.Empty(t, managedPropsCM.Data[workflowproj.ApplicationPropertiesFileName])
assert.NotEmpty(t, managedPropsCM.Data[workflowproj.GetManagedPropertiesFileName(workflow)])

props := properties.MustLoadString(reflectCm.Data[workflowproj.ApplicationPropertiesFileName])
props := properties.MustLoadString(managedPropsCM.Data[workflowproj.GetManagedPropertiesFileName(workflow)])
assert.Equal(t, "8080", props.GetString("quarkus.http.port", ""))

// we change the properties to something different, we add ours and change the default
reflectCm.Data[workflowproj.ApplicationPropertiesFileName] = "quarkus.http.port=9090\nmy.new.prop=1"
visitor(reflectCm)
userPropsCM.Data[workflowproj.ApplicationPropertiesFileName] = "quarkus.http.port=9090\nmy.new.prop=1"
visitor(managedPropsCM)
assert.NoError(t, mutateFn())

// we should preserve the default, and still got ours
props = properties.MustLoadString(reflectCm.Data[workflowproj.ApplicationPropertiesFileName])
props = properties.MustLoadString(managedPropsCM.Data[workflowproj.GetManagedPropertiesFileName(workflow)])
assert.Equal(t, "8080", props.GetString("quarkus.http.port", ""))
assert.Equal(t, "0.0.0.0", props.GetString("quarkus.http.host", ""))
assert.Equal(t, "1", props.GetString("my.new.prop", ""))
assert.NotContains(t, "my.new.prop", props.Keys())
}

func Test_ensureWorkflowPropertiesConfigMapMutator_DollarReplacement(t *testing.T) {
workflow := test.GetBaseSonataFlowWithDevProfile(t.Name())
existingCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: workflow.Name,
Namespace: workflow.Namespace,
UID: "0000-0001-0002-0003",
},
Data: map[string]string{
workflowproj.ApplicationPropertiesFileName: "mp.messaging.outgoing.kogito_outgoing_stream.url=${kubernetes:services.v1/event-listener}",
},
}
mutateVisitorFn := WorkflowPropertiesMutateVisitor(context.TODO(), nil, workflow, nil)
platform := test.GetBasePlatform()
managedProps, _ := ManagedPropsConfigMapCreator(workflow, platform)
managedProps.SetName(workflow.Name)
managedProps.SetNamespace(workflow.Namespace)
managedProps.SetUID("0000-0001-0002-0003")
managedPropsCM := managedProps.(*corev1.ConfigMap)

userProps, _ := UserPropsConfigMapCreator(workflow)
userPropsCM := userProps.(*corev1.ConfigMap)
userPropsCM.Data[workflowproj.ApplicationPropertiesFileName] = "mp.messaging.outgoing.kogito_outgoing_stream.url=${kubernetes:services.v1/event-listener}"

mutateVisitorFn := ManagedPropertiesMutateVisitor(context.TODO(), nil, workflow, nil, userPropsCM)

err := mutateVisitorFn(existingCM)()
err := mutateVisitorFn(managedPropsCM)()
assert.NoError(t, err)
assert.Contains(t, existingCM.Data[workflowproj.ApplicationPropertiesFileName], "${kubernetes:services.v1/event-listener}")
assert.NotContains(t, managedPropsCM.Data[workflowproj.GetManagedPropertiesFileName(workflow)], "mp.messaging.outgoing.kogito_outgoing_stream.url")
// assert.Contains(t, managedPropsCM.Data[workflowproj.GetManagedPropertiesFileName(workflow)], "${kubernetes:services.v1/event-listener}")
}

func TestMergePodSpec(t *testing.T) {
Expand Down
33 changes: 16 additions & 17 deletions controllers/profiles/common/properties/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,34 @@ func (a *appPropertyHandler) WithServiceDiscovery(ctx context.Context, catalog d
}

func (a *appPropertyHandler) Build() string {
var props *properties.Properties
var userProps *properties.Properties
var propErr error = nil
if len(a.userProperties) == 0 {
props = properties.NewProperties()
userProps = properties.NewProperties()
} else {
props, propErr = properties.LoadString(a.userProperties)
userProps, propErr = properties.LoadString(a.userProperties)
}
if propErr != nil {
klog.V(log.D).InfoS("Can't load user's property", "workflow", a.workflow.Name, "namespace", a.workflow.Namespace, "properties", a.userProperties)
props = properties.NewProperties()
userProps = properties.NewProperties()
}
// Disable expansions since it's not our responsibility
// Property expansion means resolving ${} within the properties and environment context. Quarkus will do that in runtime.
props.DisableExpansion = true
userProps.DisableExpansion = true

removeDiscoveryProperties(props)
removeDiscoveryProperties(userProps)
discoveryProps := properties.NewProperties()
if a.requireServiceDiscovery() {
// produce the MicroProfileConfigServiceCatalog properties for the service discovery property values if any.
discoveryProperties := generateDiscoveryProperties(a.ctx, a.catalog, props, a.workflow)
if discoveryProperties.Len() > 0 {
props.Merge(discoveryProperties)
}
discoveryProps.Merge(generateDiscoveryProperties(a.ctx, a.catalog, userProps, a.workflow))
}
props = utils.NewApplicationPropertiesBuilder().
WithInitialProperties(props).
userProps = utils.NewApplicationPropertiesBuilder().
WithInitialProperties(discoveryProps).
WithImmutableProperties(properties.MustLoadString(immutableApplicationProperties)).
WithDefaultMutableProperties(a.defaultMutableProperties).
Build()

return props.String()
return userProps.String()
}

// withKogitoServiceUrl adds the property kogitoServiceUrlProperty to the application properties.
Expand Down Expand Up @@ -135,13 +133,14 @@ func (a *appPropertyHandler) addDefaultMutableProperty(name string, value string
}

// NewAppPropertyHandler creates a property handler for a given workflow to execute in the provided platform.
// This handler is intended to build the application properties required by the workflow to execute properly, note that
// the produced properties might vary depending on the platfom, for example, if the job service managed by the platform
// This handler is intended to build the managed application properties required by the workflow to execute properly together with
// the user properties defined in the user-managed ConfigMap.
// Note that the produced properties might vary depending on the platfom, for example, if the job service managed by the platform
// a particular set of properties will be added, etc.
// By default, the following properties are incorporated:
// The set of immutable properties provided by the operator. (user can never change)
// The set of defaultMutableProperties that are provided by the operator, and that the user might overwrite if it changes
// the workflow ConfigMap. This set includes for example the required properties to connect with the data index and the
// The set of defaultMutableProperties that are provided by the operator, and that the user cannot overwrite even if it changes
// the user-managed ConfigMap. This set includes for example the required properties to connect with the data index and the
// job service when any of these services are managed by the platform.
func NewAppPropertyHandler(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (AppPropertyHandler, error) {
handler := &appPropertyHandler{
Expand Down
Loading

0 comments on commit 2964c1f

Please sign in to comment.