Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync main branch with Apache main branch (OSL 1.35 cut-off) #91

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/apache/incubator-kie-kogito-serverless-operator/api

go 1.22.8
go 1.22.0

require (
github.com/serverlessworkflow/sdk-go/v2 v2.4.1
github.com/serverlessworkflow/sdk-go/v2 v2.4.2
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
knative.dev/pkg v0.0.0-20231023151236-29775d7c9e5c
Expand Down
2 changes: 1 addition & 1 deletion api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/relvacode/iso8601 v1.4.0 h1:GsInVSEJfkYuirYFxa80nMLbH2aydgZpIf52gYZXUJs=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/serverlessworkflow/sdk-go/v2 v2.4.1 h1:9NnaYGhSKZj19rRC8gTZ3IVJJ4EjFG0LJuQe/bt510Q=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2 h1:dqRa/i5J885rk0bGIXzUVLwEFfRWB9gpQfOdXlbejsI=
github.com/sosodev/duration v1.3.1 h1:qtHBDMQ6lvMQsL15g4aopM4HEfOaYuhWBw3NPTtlqq4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
Expand Down
2 changes: 1 addition & 1 deletion bddframework/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/apache/incubator-kie-kogito-serverless-operator/bddframework

go 1.22.8
go 1.22.0

// Internal dependencies
replace github.com/apache/incubator-kie-kogito-serverless-operator => ../
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ data:
# If true, the workflow deployments will be configured to send accumulated workflow status change events to the Data
# Index Service reducing the number of produced events. Set to false to send individual events.
kogitoEventsGrouping: true
# If true, the accumulated workflow status change events will be sent in binary mode. (reduces the evens size)
kogitoEventsGroupingBinary: true
# If true, the accumulated workflow status change events, when sent in binary mode, will be gzipped at the cost of
# some performance.
kogitoEventsGroupingCompress: false
kind: ConfigMap
metadata:
name: sonataflow-operator-controllers-config
7 changes: 6 additions & 1 deletion config/manager/controllers_cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ postgreSQLPersistenceExtensions:
version: 999-20240912-SNAPSHOT
# If true, the workflow deployments will be configured to send accumulated workflow status change events to the Data
# Index Service reducing the number of produced events. Set to false to send individual events.
kogitoEventsGrouping: true
kogitoEventsGrouping: true
# If true, the accumulated workflow status change events will be sent in binary mode. (reduces the evens size)
kogitoEventsGroupingBinary: true
# If true, the accumulated workflow status change events, when sent in binary mode, will be gzipped at the cost of
# some performance.
kogitoEventsGroupingCompress: false
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/apache/incubator-kie-kogito-serverless-operator

go 1.22.8
go 1.22.0

// Internal dependencies
replace (
Expand All @@ -24,7 +24,7 @@ require (
github.com/openshift/client-go v0.0.0-20240528061634-b054aa794d87
github.com/pkg/errors v0.9.1
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.55.1
github.com/serverlessworkflow/sdk-go/v2 v2.4.1
github.com/serverlessworkflow/sdk-go/v2 v2.4.2
github.com/stretchr/testify v1.9.0
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.3.0 h1:uIkTLo0AGRc8l7h5l9r+GcYi9qfVP
github.com/santhosh-tekuri/jsonschema/v5 v5.3.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/serverlessworkflow/sdk-go/v2 v2.4.1 h1:9NnaYGhSKZj19rRC8gTZ3IVJJ4EjFG0LJuQe/bt510Q=
github.com/serverlessworkflow/sdk-go/v2 v2.4.1/go.mod h1:gl5WYsxKseaozFkHJwWNO5EGcG7Zztqk2HGuqeCovj4=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2 h1:dqRa/i5J885rk0bGIXzUVLwEFfRWB9gpQfOdXlbejsI=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2/go.mod h1:WGJR0YhXp035Y/toMKwHeIT5/EDEkDEDozn6VIGSUqI=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
Expand Down
4 changes: 2 additions & 2 deletions go.work
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
go 1.22.8
go 1.22.0

use (
.
Expand All @@ -15,7 +15,7 @@ replace (
github.com/openshift/client-go => github.com/openshift/client-go v0.0.0-20230503144108-75015d2347cb

// Main dependencies sync
github.com/serverlessworkflow/sdk-go/v2 => github.com/serverlessworkflow/sdk-go/v2 v2.4.1
github.com/serverlessworkflow/sdk-go/v2 => github.com/serverlessworkflow/sdk-go/v2 v2.4.2
k8s.io/api => k8s.io/api v0.31.1
k8s.io/apimachinery => k8s.io/apimachinery v0.31.1
k8s.io/client-go => k8s.io/client-go v0.31.1
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2690,6 +2690,8 @@ github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg=
github.com/sclevine/spec v1.2.0 h1:1Jwdf9jSfDl9NVmt8ndHqbTZ7XCCPbh1jI3hkDBHVYA=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2 h1:dqRa/i5J885rk0bGIXzUVLwEFfRWB9gpQfOdXlbejsI=
github.com/serverlessworkflow/sdk-go/v2 v2.4.2/go.mod h1:WGJR0YhXp035Y/toMKwHeIT5/EDEkDEDozn6VIGSUqI=
github.com/shirou/gopsutil v0.0.0-20180427012116-c95755e4bcd7 h1:80VN+vGkqM773Br/uNNTSheo3KatTgV8IpjIKjvVLng=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 h1:udFKJ0aHUL60LboW/A+DfgoHVedieIzIXE8uylPue0U=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
Expand Down
2 changes: 1 addition & 1 deletion images/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

- name: operator-builder
version: 999.0.0-snapshot
from: 'golang:1.22.8'
from: 'golang:1.22'
description: Builder Image for the Operator

args:
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/cfg/controllers_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type ControllersCfg struct {
BuilderConfigMapName string `yaml:"builderConfigMapName,omitempty"`
PostgreSQLPersistenceExtensions []GAV `yaml:"postgreSQLPersistenceExtensions,omitempty"`
KogitoEventsGrouping bool `yaml:"kogitoEventsGrouping,omitempty"`
KogitoEventsGroupingBinary bool `yaml:"KogitoEventsGroupingBinary,omitempty"`
KogitoEventsGroupingCompress bool `yaml:"KogitoEventsGroupingCompress,omitempty"`
}

// InitializeControllersCfg initializes the platform configuration for this instance.
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/cfg/controllers_cfg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestInitializeControllersCfgAt_ValidFile(t *testing.T) {
Version: "999-SNAPSHOT",
}, postgresExtensions[2])
assert.True(t, cfg.KogitoEventsGrouping)
assert.True(t, cfg.KogitoEventsGroupingBinary)
assert.False(t, cfg.KogitoEventsGroupingCompress)
}

func TestInitializeControllersCfgAt_FileNotFound(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion internal/controller/cfg/testdata/controllers-cfg-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ postgreSQLPersistenceExtensions:
- groupId: org.kie
artifactId: kie-addons-quarkus-persistence-jdbc
version: 999-SNAPSHOT
kogitoEventsGrouping: true
kogitoEventsGrouping: true
kogitoEventsGroupingBinary: true
kogitoEventsGroupingCompress: false
6 changes: 3 additions & 3 deletions internal/controller/platform/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package platform
import (
"context"

"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"

v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
corev1 "k8s.io/api/core/v1"
)

// Action --.
Expand All @@ -38,7 +38,7 @@ type Action interface {
CanHandle(platform *v08.SonataFlowPlatform) bool

// executes the handling function
Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error)
Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error)
}

type baseAction struct {
Expand Down
6 changes: 4 additions & 2 deletions internal/controller/platform/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package platform
import (
"context"

corev1 "k8s.io/api/core/v1"

"github.com/apache/incubator-kie-kogito-serverless-operator/api"
v08 "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
)
Expand All @@ -43,9 +45,9 @@ func (action *createAction) CanHandle(platform *v08.SonataFlowPlatform) bool {
return platform.Status.IsCreating()
}

func (action *createAction) Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, error) {
func (action *createAction) Handle(ctx context.Context, platform *v08.SonataFlowPlatform) (*v08.SonataFlowPlatform, *corev1.Event, error) {
//TODO: Perform the actions needed for the Platform creation
platform.Status.Manager().MarkTrue(api.SucceedConditionType)

return platform, nil
return platform, nil, nil
}
16 changes: 8 additions & 8 deletions internal/controller/platform/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ func (action *initializeAction) CanHandle(platform *operatorapi.SonataFlowPlatfo
return platform.Status.GetTopLevelCondition().IsUnknown() || platform.Status.IsDuplicated()
}

func (action *initializeAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
func (action *initializeAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) {
duplicate, err := action.isPrimaryDuplicate(ctx, platform)
if err != nil {
return nil, err
return nil, nil, err
}
if duplicate {
// another platform already present in the namespace
if !platform.Status.IsDuplicated() {
plat := platform.DeepCopy()
plat.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformDuplicatedReason, "")
return plat, nil
return plat, nil, nil
}

return nil, nil
return nil, nil, nil
}

if err = CreateOrUpdateWithDefaults(ctx, platform, true); err != nil {
return nil, err
return nil, nil, err
}
// nolint: staticcheck
if platform.Spec.Build.Config.BuildStrategy == operatorapi.OperatorBuildStrategy {
Expand All @@ -88,13 +88,13 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap
klog.V(log.I).InfoS("Create persistent volume claim")
err := createPersistentVolumeClaim(ctx, action.client, platform)
if err != nil {
return nil, err
return nil, nil, err
}
// Create the Kaniko warmer pod that caches the base image into the SonataFlow builder volume
klog.V(log.I).InfoS("Create Kaniko cache warmer pod")
err = createKanikoCacheWarmerPod(ctx, action.client, platform)
if err != nil {
return nil, err
return nil, nil, err
}
platform.Status.Manager().MarkFalse(api.SucceedConditionType, operatorapi.PlatformWarmingReason, "")
} else {
Expand All @@ -106,7 +106,7 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap
}
platform.Status.Version = metadata.SpecVersion

return platform, nil
return platform, nil, nil
}

// TODO: move this to Kaniko packages based on the platform context
Expand Down
56 changes: 33 additions & 23 deletions internal/controller/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,38 +61,38 @@ func (action *serviceAction) CanHandle(platform *operatorapi.SonataFlowPlatform)
return platform.Status.IsReady()
}

func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, *corev1.Event, error) {
// Refresh applied configuration
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
return nil, err
return nil, nil, err
}

psDI := services.NewDataIndexHandler(platform)
if psDI.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, err
if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psDI); err != nil {
return nil, event, err
}
}

psJS := services.NewJobServiceHandler(platform)
if psJS.IsServiceSetInSpec() {
if err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, err
if event, err := createOrUpdateServiceComponents(ctx, action.client, platform, psJS); err != nil {
return nil, event, err
}
}

return platform, nil
return platform, nil, nil
}

func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
func createOrUpdateServiceComponents(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Event, error) {
if err := createOrUpdateConfigMap(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
if err := createOrUpdateDeployment(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
if err := createOrUpdateService(ctx, client, platform, psh); err != nil {
return err
return nil, err
}
return createOrUpdateKnativeResources(ctx, client, platform, psh)
}
Expand Down Expand Up @@ -159,6 +159,7 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
MatchLabels: selectorLbl,
},
Replicas: &replicas,
Strategy: psh.GetDeploymentStrategy(),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: lbl,
Expand Down Expand Up @@ -200,6 +201,9 @@ func createOrUpdateDeployment(ctx context.Context, client client.Client, platfor
if op, err := controllerutil.CreateOrUpdate(ctx, client, serviceDeployment, func() error {
knative.SaveKnativeData(&serviceDeploymentSpec.Template.Spec, &serviceDeployment.Spec.Template.Spec)
err := mergo.Merge(&(serviceDeployment.Spec), serviceDeploymentSpec, mergo.WithOverride)
// mergo.Merge algorithm is not setting the serviceDeployment.Spec.Replicas when the
// *serviceDeploymentSpec.Replicas is 0. Making impossible to scale to zero. Ensure the value.
serviceDeployment.Spec.Replicas = serviceDeploymentSpec.Replicas
if err != nil {
return err
}
Expand Down Expand Up @@ -307,24 +311,24 @@ func setSonataFlowPlatformFinalizer(ctx context.Context, c client.Client, platfo
return nil
}

func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) error {
func createOrUpdateKnativeResources(ctx context.Context, client client.Client, platform *operatorapi.SonataFlowPlatform, psh services.PlatformServiceHandler) (*corev1.Event, error) {
lbl, _ := getLabels(platform, psh)
objs, err := psh.GenerateKnativeResources(platform, lbl)
objs, event, err := psh.GenerateKnativeResources(platform, lbl)
if err != nil {
return err
return event, err
}
// Create or update triggers
for _, obj := range objs {
if triggerDef, ok := obj.(*eventingv1.Trigger); ok {
if platform.Namespace == obj.GetNamespace() {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
return nil, err
}
} else {
// This is for Knative trigger in a different namespace
// Set the finalizer for trigger cleanup when the platform is deleted
if err := setSonataFlowPlatformFinalizer(ctx, client, platform); err != nil {
return err
return nil, err
}
}
trigger := &eventingv1.Trigger{
Expand All @@ -335,21 +339,21 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
return nil
})
if err != nil {
return err
return nil, err
}
addToSonataFlowPlatformTriggerList(platform, trigger)
}
}

if err := SafeUpdatePlatformStatus(ctx, platform); err != nil {
return err
return nil, err
}

// Create or update sinkbindings
for _, obj := range objs {
if sbDef, ok := obj.(*sourcesv1.SinkBinding); ok {
if err := controllerutil.SetControllerReference(platform, obj, client.Scheme()); err != nil {
return err
return nil, err
}
sinkBinding := &sourcesv1.SinkBinding{
ObjectMeta: sbDef.ObjectMeta,
Expand All @@ -359,18 +363,24 @@ func createOrUpdateKnativeResources(ctx context.Context, client client.Client, p
return nil
})
if err != nil {
return err
return nil, err
}
kSinkInjected, err := psh.CheckKSinkInjected()
if err != nil {
return err
return nil, err
}
if !kSinkInjected {
return fmt.Errorf("waiting for K_SINK injection for %s to complete", psh.GetServiceName())
msg := fmt.Sprintf("waiting for K_SINK injection for service %s to complete", psh.GetServiceName())
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: services.WaitingKnativeEventing,
Message: msg,
}
return event, fmt.Errorf(msg)
}
}
}
return nil
return nil, nil
}

func addToSonataFlowPlatformTriggerList(platform *operatorapi.SonataFlowPlatform, trigger *eventingv1.Trigger) {
Expand Down
Loading
Loading