Skip to content

Commit

Permalink
Further improvements and add jobs permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
rhkp committed Oct 2, 2024
1 parent 8922cfc commit f7b037c
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 28 deletions.
10 changes: 7 additions & 3 deletions api/v1alpha08/sonataflowplatform_services_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
// ServicesPlatformSpec describes the desired service configuration for workflows without the `sonataflow.org/profile: dev` annotation.
type ServicesPlatformSpec struct {
// true = Use DB Migration Job with DB Migrator tool image
// false = Use built-in DB migration capability within services e.g. DI/JS
// false = Use built-in DB migration capability within services e.g. DI/JS, use MigrateDBOnStartUp flag
// +optional
// +default: false
JobBasedDbMigration bool `json:"jobBasedDbMigration,omitempty"`
// +default: true
JobBasedDbMigrationDI bool `json:"jobBasedDbMigrationDI,omitempty"`

// +optional
// +default: true
JobBasedDbMigrationJS bool `json:"jobBasedDbMigrationJS,omitempty"`

// Deploys the Data Index service for use by workflows without the `sonataflow.org/profile: dev` annotation.
// +optional
Expand Down
6 changes: 4 additions & 2 deletions bundle/manifests/sonataflow.org_sonataflowplatforms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8696,10 +8696,12 @@ spec:
type: string
type: object
type: object
jobBasedDbMigration:
jobBasedDbMigrationDI:
description: |-
true = Use DB Migration Job with DB Migrator tool image
false = Use built-in DB migration capability within services e.g. DI/JS
false = Use built-in DB migration capability within services e.g. DI/JS, use MigrateDBOnStartUp flag
type: boolean
jobBasedDbMigrationJS:
type: boolean
jobService:
description: 'Deploys the Job service for use by workflows without
Expand Down
6 changes: 4 additions & 2 deletions config/crd/bases/sonataflow.org_sonataflowplatforms.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8696,10 +8696,12 @@ spec:
type: string
type: object
type: object
jobBasedDbMigration:
jobBasedDbMigrationDI:
description: |-
true = Use DB Migration Job with DB Migrator tool image
false = Use built-in DB migration capability within services e.g. DI/JS
false = Use built-in DB migration capability within services e.g. DI/JS, use MigrateDBOnStartUp flag
type: boolean
jobBasedDbMigrationJS:
type: boolean
jobService:
description: 'Deploys the Job service for use by workflows without
Expand Down
12 changes: 12 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- batch
resources:
- jobs
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- sonataflow.org
resources:
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/platform/dbMigratorJob.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ func NewDBMigratorJobData(ctx context.Context, client client.Client, platform *o
quarkusDatasourceJobsservicePassword := ""
quarkusFlywayJobsserviceSchemas := ""

migrateDbDataindex := pshDI.IsServiceEnabledInSpec()
migrateDbDataindex := services.IsJobBasedDBMigrationDI(platform)
if migrateDbDataindex {
quarkusDatasourceDataindexJdbcUrl = platform.Spec.Services.DataIndex.Persistence.PostgreSQL.JdbcUrl
quarkusDatasourceDataindexUsername, _ = services.GetSecretKeyValueString(ctx, client, platform.Spec.Services.DataIndex.Persistence.PostgreSQL.SecretRef.Name, platform.Spec.Services.DataIndex.Persistence.PostgreSQL.SecretRef.UserKey, platform)
quarkusDatasourceDataindexPassword, _ = services.GetSecretKeyValueString(ctx, client, platform.Spec.Services.DataIndex.Persistence.PostgreSQL.SecretRef.Name, platform.Spec.Services.DataIndex.Persistence.PostgreSQL.SecretRef.PasswordKey, platform)
quarkusFlywayDataindexSchemas = getDBSchemaName(platform.Spec.Services.DataIndex.Persistence.PostgreSQL, "defaultDi")
}
migrateDbJobsservice := pshJS.IsServiceEnabledInSpec()
migrateDbJobsservice := services.IsJobBasedDBMigrationJS(platform)
if migrateDbJobsservice {
quarkusDatasourceJobsserviceJdbcUrl = platform.Spec.Services.JobService.Persistence.PostgreSQL.JdbcUrl
quarkusDatasourceJobsserviceUsername, _ = services.GetSecretKeyValueString(ctx, client, platform.Spec.Services.JobService.Persistence.PostgreSQL.SecretRef.Name, platform.Spec.Services.JobService.Persistence.PostgreSQL.SecretRef.UserKey, platform)
Expand Down
24 changes: 16 additions & 8 deletions internal/controller/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,27 @@ func (action *serviceAction) CanHandle(platform *operatorapi.SonataFlowPlatform)

// checkNReportInconsistentDBMigrationFlags emit warning in logs, if Job based migration is true and also DI/JS migrateDBOnStartUp is true, does not return error
func (action *serviceAction) checkNReportInconsistentDBMigrationFlags(platform *operatorapi.SonataFlowPlatform, pshDI services.PlatformServiceHandler, pshJS services.PlatformServiceHandler) {
isJobBasedDBMigration := services.IsJobBasedDBMigration(platform)
diMigrateDBOnStartUp := false
jsMigrateDBOnStartUp := false
isJobBasedDBMigrationDI := services.IsJobBasedDBMigrationDI(platform)
isJobBasedDBMigrationJS := services.IsJobBasedDBMigrationJS(platform)
isJobBasedDBMigration := isJobBasedDBMigrationDI || isJobBasedDBMigrationJS

migrateDBOnStartUpDI := false
migrateDBOnStartUpJS := false
if pshDI.IsPersistenceSetInSpec() {
diMigrateDBOnStartUp = platform.Spec.Services.DataIndex.Persistence.MigrateDBOnStartUp
migrateDBOnStartUpDI = platform.Spec.Services.DataIndex.Persistence.MigrateDBOnStartUp
}

if pshJS.IsPersistenceSetInSpec() {
jsMigrateDBOnStartUp = platform.Spec.Services.JobService.Persistence.MigrateDBOnStartUp
migrateDBOnStartUpJS = platform.Spec.Services.JobService.Persistence.MigrateDBOnStartUp
}

if isJobBasedDBMigration && (diMigrateDBOnStartUp || jsMigrateDBOnStartUp) {
klog.V(log.W).InfoS("Inconsistent DB migration flags detected and it may cause unexpected errors or behaviours, please check SonataFlowPlatform deployment: ", "jobBasedDBMigration", isJobBasedDBMigration, "diMigrateDBOnStartUp", diMigrateDBOnStartUp, "jsMigrateDBOnStartUp", jsMigrateDBOnStartUp)
isServiceBasedDBMigration := migrateDBOnStartUpDI || migrateDBOnStartUpJS

// Check if both job based and service based db migration is specified
if (isJobBasedDBMigrationDI && migrateDBOnStartUpDI) || (isJobBasedDBMigrationJS && migrateDBOnStartUpJS) {
klog.V(log.W).InfoS("Both job based DB migration and service based DB migration flags detected, which may cause unexpected errors or behaviours, please check SonataFlowPlatform deployment: ", "jobBasedDBMigrationDI", isJobBasedDBMigrationDI, "jobBasedDBMigrationJS", isJobBasedDBMigrationJS, "migrateDBOnStartUpDI", migrateDBOnStartUpDI, "migrateDBOnStartUpJS", migrateDBOnStartUpJS)
} else if !isJobBasedDBMigration && !isServiceBasedDBMigration {
klog.V(log.I).InfoS("No job based or service based db migration flags specified, the services will expect the tables needed by them in the configured database: ", "jobBasedDBMigrationDI", isJobBasedDBMigrationDI, "jobBasedDBMigrationJS", isJobBasedDBMigrationJS, "migrateDBOnStartUpDI", migrateDBOnStartUpDI, "migrateDBOnStartUpJS", migrateDBOnStartUpJS)
}
}

Expand Down Expand Up @@ -123,7 +131,7 @@ func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.S
psJS := services.NewJobServiceHandler(platform)

// Invoke DB Migration only if both or either DI/JS services are requested, in addition to jobBasedDBMigration
if services.IsJobBasedDBMigration(platform) && (psDI.IsServiceSetInSpec() || psJS.IsServiceSetInSpec()) {
if (services.IsJobBasedDBMigrationDI(platform) || services.IsJobBasedDBMigrationJS(platform)) && (psDI.IsServiceSetInSpec() || psJS.IsServiceSetInSpec()) {
klog.V(log.I).InfoS("Starting DB Migration Job: ")
err := action.createOrUpdateDBMigrationJob(ctx, action.client, platform, psDI, psJS)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions internal/controller/platform/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,16 @@ func isServicesSet(platform *operatorapi.SonataFlowPlatform) bool {
return platform != nil && platform.Spec.Services != nil
}

func IsJobBasedDBMigration(platform *operatorapi.SonataFlowPlatform) bool {
func IsJobBasedDBMigrationDI(platform *operatorapi.SonataFlowPlatform) bool {
if platform != nil && platform.Spec.Services != nil {
return platform.Spec.Services.JobBasedDbMigration
return platform.Spec.Services.JobBasedDbMigrationDI
}
return false
}

func IsJobBasedDBMigrationJS(platform *operatorapi.SonataFlowPlatform) bool {
if platform != nil && platform.Spec.Services != nil {
return platform.Spec.Services.JobBasedDbMigrationJS
}
return false
}
Expand Down
1 change: 1 addition & 0 deletions internal/controller/sonataflowplatform_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type SonataFlowPlatformReconciler struct {
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowplatforms,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowplatforms/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflowplatforms/finalizers,verbs=update
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down
18 changes: 16 additions & 2 deletions operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9230,10 +9230,12 @@ spec:
type: string
type: object
type: object
jobBasedDbMigration:
jobBasedDbMigrationDI:
description: |-
true = Use DB Migration Job with DB Migrator tool image
false = Use built-in DB migration capability within services e.g. DI/JS
false = Use built-in DB migration capability within services e.g. DI/JS, use MigrateDBOnStartUp flag
type: boolean
jobBasedDbMigrationJS:
type: boolean
jobService:
description: 'Deploys the Job service for use by workflows without
Expand Down Expand Up @@ -27798,6 +27800,18 @@ kind: ClusterRole
metadata:
name: sonataflow-operator-manager-role
rules:
- apiGroups:
- batch
resources:
- jobs
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- sonataflow.org
resources:
Expand Down
11 changes: 5 additions & 6 deletions test/e2e/platform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
cmd.Stdin = bytes.NewBuffer(manifests)
_, err := utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())

By("Wait for SonatatFlowPlatform CR to complete deployment")
// wait for service deployments to be ready
EventuallyWithOffset(1, func() error {
cmd = exec.Command("kubectl", "wait", "pod", "-n", targetNamespace, "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "--for", "condition=Ready", "--timeout=5s")
_, err = utils.Run(cmd)
return err
}, 10*time.Minute, 5).Should(Succeed())

By("Evaluate status of all service's health endpoint")
cmd = exec.Command("kubectl", "get", "pod", "-l", "app.kubernetes.io/name in (jobs-service,data-index-service)", "-n", targetNamespace, "-ojsonpath={.items[*].metadata.name}")
output, err := utils.Run(cmd)
Expand All @@ -111,9 +111,8 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
}
})
})

})


var _ = Context("with platform services", func() {

Expand Down Expand Up @@ -249,7 +248,7 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
cmd.Stdin = bytes.NewBuffer(manifests)
_, err = utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())

By("Wait for SonatatFlowPlatform CR to complete deployment")
// wait for service deployments to be ready
EventuallyWithOffset(1, func() error {
Expand All @@ -266,7 +265,7 @@ var _ = Describe("Platform Use Cases :: ", Label("platform"), Ordered, func() {
verifyHealthStatusInPod(pn, targetNamespace)
}
},
FEntry("Job Service and Data Index come up with job based db migration", test.GetSonataFlowE2EPlatformPersistenceSampleDataDirectory("job_based_db_migration")),
Entry("Job Service and Data Index come up with job based db migration", test.GetSonataFlowE2EPlatformPersistenceSampleDataDirectory("job_based_db_migration")),
)

DescribeTable("when deploying a SonataFlowPlatform CR with brokers", func(testcaseDir string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ spec:
strategyOptions:
KanikoBuildCacheEnabled: "true"
services:
jobBasedDbMigration: true
jobBasedDbMigrationDI: true
jobBasedDbMigrationJS: true
dataIndex:
enabled: true
persistence:
Expand Down

0 comments on commit f7b037c

Please sign in to comment.