Skip to content

Commit

Permalink
Refactor logic to retrieve platform CR when deploying workflow to inj…
Browse files Browse the repository at this point in the history
…ect persistence configuration when platform provides one

Signed-off-by: Jordi Gil <[email protected]>
  • Loading branch information
jordigilh committed Feb 5, 2024
1 parent aa75275 commit e407a8d
Show file tree
Hide file tree
Showing 32 changed files with 234 additions and 238 deletions.
60 changes: 60 additions & 0 deletions api/v1alpha08/sonataflow_persistence_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024 Apache Software Foundation (ASF)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha08

// PersistencePostgreSQL configure postgresql connection for service(s).
// +kubebuilder:validation:MinProperties=2
// +kubebuilder:validation:MaxProperties=2
type PersistencePostgreSQL struct {
// Secret reference to the database user credentials
SecretRef PostgreSQLSecretOptions `json:"secretRef"`
// Service reference to postgresql datasource. Mutually exclusive to jdbcUrl.
// +optional
ServiceRef *PostgreSQLServiceOptions `json:"serviceRef,omitempty"`
// PostgreSql JDBC URL. Mutually exclusive to serviceRef.
// e.g. "jdbc:postgresql://host:port/database?currentSchema=data-index-service"
// +optional
JdbcUrl string `json:"jdbcUrl,omitempty"`
}

// PostgreSQLSecretOptions use credential secret for postgresql connection.
type PostgreSQLSecretOptions struct {
// Name of the postgresql credentials secret.
Name string `json:"name"`
// Defaults to POSTGRESQL_USER
// +optional
UserKey string `json:"userKey,omitempty"`
// Defaults to POSTGRESQL_PASSWORD
// +optional
PasswordKey string `json:"passwordKey,omitempty"`
}

// PostgreSQLServiceOptions use k8s service to configure postgresql jdbc url.
type PostgreSQLServiceOptions struct {
// Name of the postgresql k8s service.
Name string `json:"name"`
// Namespace of the postgresql k8s service. Defaults to the SonataFlowPlatform's local namespace.
// +optional
Namespace string `json:"namespace,omitempty"`
// Port to use when connecting to the postgresql k8s service. Defaults to 5432.
// +optional
Port *int `json:"port,omitempty"`
// Name of postgresql database to be used. Defaults to "sonataflow"
// +optional
DatabaseName string `json:"databaseName,omitempty"`
// Schema of postgresql database to be used. Defaults to "data-index-service"
// +optional
DatabaseSchema string `json:"databaseSchema,omitempty"`
}
2 changes: 1 addition & 1 deletion api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ type SonataFlowSpec struct {
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="podTemplate"
PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"`
// Persistence defines the database persistence configuration for the workflow
Persistence *PersistenceSpec `json:"persistence,omitempty"`
Persistence *PersistencePlatformSpec `json:"persistence,omitempty"`
}

// SonataFlowStatus defines the observed state of SonataFlow
Expand Down
47 changes: 1 addition & 46 deletions api/v1alpha08/sonataflowplatform_services_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,53 +32,8 @@ type ServiceSpec struct {
Enabled *bool `json:"enabled,omitempty"`
// Persists service to a datasource of choice. Ephemeral by default.
// +optional
Persistence *PersistenceSpec `json:"persistence,omitempty"`
Persistence *PersistencePlatformSpec `json:"persistence,omitempty"`
// PodTemplate describes the deployment details of this platform service instance.
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="podTemplate"
PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"`
}

// PersistencePostgreSQL configure postgresql connection for service(s).
// +kubebuilder:validation:MinProperties=2
// +kubebuilder:validation:MaxProperties=2
type PersistencePostgreSQL struct {
// Secret reference to the database user credentials
SecretRef PostgreSQLSecretOptions `json:"secretRef"`
// Service reference to postgresql datasource. Mutually exclusive to jdbcUrl.
// +optional
ServiceRef *PostgreSQLServiceOptions `json:"serviceRef,omitempty"`
// PostgreSql JDBC URL. Mutually exclusive to serviceRef.
// e.g. "jdbc:postgresql://host:port/database?currentSchema=data-index-service"
// +optional
JdbcUrl string `json:"jdbcUrl,omitempty"`
}

// PostgreSQLSecretOptions use credential secret for postgresql connection.
type PostgreSQLSecretOptions struct {
// Name of the postgresql credentials secret.
Name string `json:"name"`
// Defaults to POSTGRESQL_USER
// +optional
UserKey string `json:"userKey,omitempty"`
// Defaults to POSTGRESQL_PASSWORD
// +optional
PasswordKey string `json:"passwordKey,omitempty"`
}

// PostgreSQLServiceOptions use k8s service to configure postgresql jdbc url.
type PostgreSQLServiceOptions struct {
// Name of the postgresql k8s service.
Name string `json:"name"`
// Namespace of the postgresql k8s service. Defaults to the SonataFlowPlatform's local namespace.
// +optional
Namespace string `json:"namespace,omitempty"`
// Port to use when connecting to the postgresql k8s service. Defaults to 5432.
// +optional
Port *int `json:"port,omitempty"`
// Name of postgresql database to be used. Defaults to "sonataflow"
// +optional
DatabaseName string `json:"databaseName,omitempty"`
// Schema of postgresql database to be used. Defaults to "data-index-service"
// +optional
DatabaseSchema string `json:"databaseSchema,omitempty"`
}
6 changes: 3 additions & 3 deletions api/v1alpha08/sonataflowplatform_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ type SonataFlowPlatformSpec struct {
// the configuration is used as the persistence for platform services and sonataflow instances
// that don't provide one of their own.
// +optional
Persistence *PersistenceSpec `json:"persistence,omitempty"`
Persistence *PersistencePlatformSpec `json:"persistence,omitempty"`
}

// PersistenceSpec configures the DataBase support for both platform services and workflows. For services, it allows
// PersistencePlatformSpec configures the DataBase support for both platform services and workflows. For services, it allows
// configuring a generic database connectivity if the service does not come with its own configured. In case of workflows,
// the operator will add the necessary JDBC properties to in the workflow's application.properties so that it can communicate
// with the persistence service based on the spec provided here.
// +optional
type PersistenceSpec struct {
type PersistencePlatformSpec struct {
// Connect configured services to a postgresql database.
// +optional
PostgreSQL *PersistencePostgreSQL `json:"postgresql,omitempty"`
Expand Down
38 changes: 19 additions & 19 deletions api/v1alpha08/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 9 additions & 11 deletions controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,17 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S
return nil, err
}

if platform.Spec.Services != nil {
psDI := services.NewDataIndexHandler(platform)
if psDI.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, err
}
psDI := services.NewDataIndexHandler(platform)
if psDI.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, err
}
}

psJS := services.NewJobServiceHandler(platform)
if psJS.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, err
}
psJS := services.NewJobServiceHandler(platform)
if psJS.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, err
}
}

Expand Down
16 changes: 7 additions & 9 deletions controllers/platform/platformutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,13 @@ func setPlatformDefaults(p *operatorapi.SonataFlowPlatform, verbose bool) error
}

// When dataIndex object set, default to enabled if bool not set
if p.Spec.Services != nil {
var enable = true
if p.Spec.Services.DataIndex != nil && p.Spec.Services.DataIndex.Enabled == nil {
p.Spec.Services.DataIndex.Enabled = &enable
}
// When the JobService field has a value, default to enabled if the `Enabled` field's value is nil
if p.Spec.Services.JobService != nil && p.Spec.Services.JobService.Enabled == nil {
p.Spec.Services.JobService.Enabled = &enable
}
var enable = true
if p.Spec.Services.DataIndex != nil && p.Spec.Services.DataIndex.Enabled == nil {
p.Spec.Services.DataIndex.Enabled = &enable
}
// When the JobService field has a value, default to enabled if the `Enabled` field's value is nil
if p.Spec.Services.JobService != nil && p.Spec.Services.JobService.Enabled == nil {
p.Spec.Services.JobService.Enabled = &enable
}
setStatusAdditionalInfo(p)

Expand Down
4 changes: 2 additions & 2 deletions controllers/platform/services/properties_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func setJobServiceJDBC(jdbc string) plfmOptionFn {
p.Spec.Services.JobService = &operatorapi.ServiceSpec{}
}
if p.Spec.Services.JobService.Persistence == nil {
p.Spec.Services.JobService.Persistence = &operatorapi.PersistenceSpec{}
p.Spec.Services.JobService.Persistence = &operatorapi.PersistencePlatformSpec{}
}
if p.Spec.Services.JobService.Persistence.PostgreSQL == nil {
p.Spec.Services.JobService.Persistence.PostgreSQL = &operatorapi.PersistencePostgreSQL{}
Expand All @@ -238,7 +238,7 @@ func setDataIndexJDBC(jdbc string) plfmOptionFn {
p.Spec.Services.DataIndex = &operatorapi.ServiceSpec{}
}
if p.Spec.Services.DataIndex.Persistence == nil {
p.Spec.Services.DataIndex.Persistence = &operatorapi.PersistenceSpec{}
p.Spec.Services.DataIndex.Persistence = &operatorapi.PersistencePlatformSpec{}
}
if p.Spec.Services.DataIndex.Persistence.PostgreSQL == nil {
p.Spec.Services.DataIndex.Persistence.PostgreSQL = &operatorapi.PersistencePostgreSQL{}
Expand Down
19 changes: 7 additions & 12 deletions controllers/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,21 +408,16 @@ func (j JobServiceHandler) GenerateServiceProperties() (*properties.Properties,
props.Set(constants.JobServiceKafkaSmallRyeHealthProperty, "false")
// add data source reactive URL
if j.hasPostgreSQLConfigured() {
var dataSourceReactiveURL string
var err error
jspec := j.platform.Spec.Services.JobService
var pspec *operatorapi.PersistencePostgreSQL
if j.IsServiceSetInSpec() && jspec.Persistence != nil && jspec.Persistence.PostgreSQL != nil {
dataSourceReactiveURL, err = generateReactiveURL(j.platform.Spec.Services.JobService.Persistence.PostgreSQL, j.GetServiceName(), j.platform.Namespace, constants.DefaultDatabaseName, constants.DefaultPostgreSQLPort)
if err != nil {
return nil, err
}
pspec = j.platform.Spec.Services.JobService.Persistence.PostgreSQL
} else {
p := j.platform.Spec.Persistence.PostgreSQL
var namespace string
if len(p.ServiceRef.Namespace) > 0 {
namespace = fmt.Sprintf(".%s", p.ServiceRef.Namespace)
}
dataSourceReactiveURL = fmt.Sprintf("%s://%s%s:%d/%s?search_path=%s", constants.PersistenceTypePostgreSQL, p.ServiceRef.Name, namespace, *p.ServiceRef.Port, p.ServiceRef.DatabaseName, j.GetServiceName())
pspec = j.platform.Spec.Persistence.PostgreSQL
}
dataSourceReactiveURL, err := generateReactiveURL(pspec, j.GetServiceName(), j.platform.Namespace, constants.DefaultDatabaseName, constants.DefaultPostgreSQLPort)
if err != nil {
return nil, err
}
props.Set(constants.JobServiceDataSourceReactiveURL, dataSourceReactiveURL)
}
Expand Down
10 changes: 5 additions & 5 deletions controllers/profiles/common/ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ var _ ObjectEnsurer = &defaultObjectEnsurer{}
var _ ObjectEnsurer = &noopObjectEnsurer{}

type ObjectEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
}
type ObjectEnsurerWithPlatform interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, pl *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
}

// MutateVisitor is a visitor function that mutates the given object before performing any updates in the cluster.
Expand Down Expand Up @@ -73,10 +73,10 @@ type defaultObjectEnsurer struct {
creator ObjectCreator
}

func (d *defaultObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error) {
func (d *defaultObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error) {
result := controllerutil.OperationResultNone

object, err := d.creator(workflow, plf)
object, err := d.creator(workflow)
if err != nil {
return nil, result, err
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func NewNoopObjectEnsurer() ObjectEnsurer {
type noopObjectEnsurer struct {
}

func (d *noopObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, plf *operatorapi.SonataFlowPlatform, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error) {
func (d *noopObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error) {
result := controllerutil.OperationResultNone
return nil, result, nil
}
Loading

0 comments on commit e407a8d

Please sign in to comment.