diff --git a/api/v1alpha08/sonataflow_types.go b/api/v1alpha08/sonataflow_types.go index 04aa5440c..ac32fe357 100644 --- a/api/v1alpha08/sonataflow_types.go +++ b/api/v1alpha08/sonataflow_types.go @@ -659,6 +659,9 @@ type SonataFlowSpec struct { PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"` // Persistence defines the database persistence configuration for the workflow Persistence *PersistenceOptionsSpec `json:"persistence,omitempty"` + // Sink describes the sinkBinding details of this SonataFlow instance. + //+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sink" + Sink *duckv1.Destination `json:"sink,omitempty"` } // SonataFlowStatus defines the observed state of SonataFlow diff --git a/api/v1alpha08/zz_generated.deepcopy.go b/api/v1alpha08/zz_generated.deepcopy.go index 75e8bf32e..fb2db1872 100644 --- a/api/v1alpha08/zz_generated.deepcopy.go +++ b/api/v1alpha08/zz_generated.deepcopy.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -1174,6 +1175,11 @@ func (in *SonataFlowSpec) DeepCopyInto(out *SonataFlowSpec) { *out = new(PersistenceOptionsSpec) (*in).DeepCopyInto(*out) } + if in.Sink != nil { + in, out := &in.Sink, &out.Sink + *out = new(duckv1.Destination) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SonataFlowSpec. diff --git a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml index cffce25e9..7597683f5 100644 --- a/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml +++ b/bundle/manifests/sonataflow-operator.clusterserviceversion.yaml @@ -312,6 +312,9 @@ spec: definition. For example, a collection of OpenAPI specification files. displayName: resources path: resources + - description: Sink describes the sinkBinding details of this SonataFlow instance. + displayName: sink + path: sink statusDescriptors: - description: Address is used as a part of Addressable interface (status.address.url) for knative @@ -400,6 +403,36 @@ spec: - patch - update - watch + - apiGroups: + - eventing.knative.dev + resources: + - triggers + - triggers/status + - triggers/finalizers + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch + - apiGroups: + - sources.knative.dev + resources: + - sinkbindings + - sinkbindings/status + - sinkbindings/finalizers + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch - apiGroups: - coordination.k8s.io resources: diff --git a/bundle/manifests/sonataflow.org_sonataflows.yaml b/bundle/manifests/sonataflow.org_sonataflows.yaml index 76c704fb3..6836cd493 100644 --- a/bundle/manifests/sonataflow.org_sonataflows.yaml +++ b/bundle/manifests/sonataflow.org_sonataflows.yaml @@ -9346,6 +9346,53 @@ spec: type: object type: array type: object + sink: + description: Sink describes the sinkBinding details of this SonataFlow + instance. + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided by + the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of the + group. This can be used as an alternative to the APIVersion, + and then resolved using ResolveGroup. Note: This API is + EXPERIMENTAL and might break anytime. For more details: + https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and non-empty + host) pointing to the target or a relative URI. Relative URIs + will be resolved using the base URI retrieved from Ref. + type: string + type: object required: - flow type: object diff --git a/config/crd/bases/sonataflow.org_sonataflows.yaml b/config/crd/bases/sonataflow.org_sonataflows.yaml index e5214329e..a9df118fd 100644 --- a/config/crd/bases/sonataflow.org_sonataflows.yaml +++ b/config/crd/bases/sonataflow.org_sonataflows.yaml @@ -9347,6 +9347,53 @@ spec: type: object type: array type: object + sink: + description: Sink describes the sinkBinding details of this SonataFlow + instance. + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided by + the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of the + group. This can be used as an alternative to the APIVersion, + and then resolved using ResolveGroup. Note: This API is + EXPERIMENTAL and might break anytime. For more details: + https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and non-empty + host) pointing to the target or a relative URI. Relative URIs + will be resolved using the base URI retrieved from Ref. + type: string + type: object required: - flow type: object diff --git a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml index 1f65bbedb..089ae24b4 100644 --- a/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml @@ -196,6 +196,9 @@ spec: definition. For example, a collection of OpenAPI specification files. displayName: resources path: resources + - description: Sink describes the sinkBinding details of this SonataFlow instance. + displayName: sink + path: sink statusDescriptors: - description: Address is used as a part of Addressable interface (status.address.url) for knative diff --git a/config/rbac/builder_role.yaml b/config/rbac/builder_role.yaml index 9d00da3f5..70b2ab54e 100644 --- a/config/rbac/builder_role.yaml +++ b/config/rbac/builder_role.yaml @@ -57,6 +57,36 @@ rules: resources: - roles - rolebindings + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch +- apiGroups: + - eventing.knative.dev + resources: + - triggers + - triggers/status + - triggers/finalizers + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch +- apiGroups: + - sources.knative.dev + resources: + - sinkbindings + - sinkbindings/status + - sinkbindings/finalizers verbs: - create - delete diff --git a/controllers/profiles/common/constants/workflows.go b/controllers/profiles/common/constants/workflows.go index a64002ad2..8087f963b 100644 --- a/controllers/profiles/common/constants/workflows.go +++ b/controllers/profiles/common/constants/workflows.go @@ -16,4 +16,11 @@ package constants const ( MicroprofileServiceCatalogPropertyPrefix = "org.kie.kogito.addons.discovery." + KogitoOutgoingEventsURL = "mp.messaging.outgoing.kogito_outgoing_stream.url" + KogitoOutgoingEventsConnector = "mp.messaging.outgoing.kogito_outgoing_stream.connector" + KogitoIncomingEventsConnector = "mp.messaging.incoming.kogito_incoming_stream.connector" + KogitoIncomingEventsPath = "mp.messaging.incoming.kogito_incoming_stream.path" + KnativeHealthEnabled = "org.kie.kogito.addons.knative.eventing.health-enabled" + KnativeInjectedEnvVar = "${K_SINK}" + KnativeEventingBrokerDefault = "default" ) diff --git a/controllers/profiles/common/ensurer.go b/controllers/profiles/common/ensurer.go index 980d5cbf5..a1b8c92c3 100644 --- a/controllers/profiles/common/ensurer.go +++ b/controllers/profiles/common/ensurer.go @@ -32,6 +32,7 @@ import ( var _ ObjectEnsurer = &defaultObjectEnsurer{} var _ ObjectEnsurer = &noopObjectEnsurer{} +var _ ObjectsEnsurer = &defaultObjectsEnsurer{} type ObjectEnsurer interface { Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error) @@ -77,22 +78,10 @@ func (d *defaultObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi result := controllerutil.OperationResultNone object, err := d.creator(workflow) - if err != nil { - return nil, result, err - } - if result, err = controllerutil.CreateOrPatch(ctx, d.c, object, - func() error { - for _, v := range visitors { - if visitorErr := v(object)(); visitorErr != nil { - return visitorErr - } - } - return controllerutil.SetControllerReference(workflow, object, d.c.Scheme()) - }); err != nil { + if err != nil || object == nil { return nil, result, err } - klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace()) - return object, result, nil + return ensureObject(ctx, workflow, visitors, result, d.c, object) } // defaultObjectEnsurerWithPlatform is the equivalent of defaultObjectEnsurer for resources that require a reference to the SonataFlowPlatform @@ -136,3 +125,61 @@ func (d *noopObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.So result := controllerutil.OperationResultNone return nil, result, nil } + +// ObjectsEnsurer is an ensurer to apply multiple objects +type ObjectsEnsurer interface { + Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult +} + +type ObjectEnsurerResult struct { + client.Object + Result controllerutil.OperationResult + Error error +} + +func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) ObjectsEnsurer { + return &defaultObjectsEnsurer{ + c: client, + creator: creator, + } +} + +type defaultObjectsEnsurer struct { + ObjectsEnsurer + c client.Client + creator ObjectsCreator +} + +func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult { + result := controllerutil.OperationResultNone + + objects, err := d.creator(workflow) + if err != nil { + return []ObjectEnsurerResult{{nil, result, err}} + } + var ensureResult []ObjectEnsurerResult + for _, object := range objects { + ensureObject, c, err := ensureObject(ctx, workflow, visitors, result, d.c, object) + ensureResult = append(ensureResult, ObjectEnsurerResult{ensureObject, c, err}) + if err != nil { + return ensureResult + } + } + return ensureResult +} + +func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitors []MutateVisitor, result controllerutil.OperationResult, c client.Client, object client.Object) (client.Object, controllerutil.OperationResult, error) { + if result, err := controllerutil.CreateOrPatch(ctx, c, object, + func() error { + for _, v := range visitors { + if visitorErr := v(object)(); visitorErr != nil { + return visitorErr + } + } + return controllerutil.SetControllerReference(workflow, object, c.Scheme()) + }); err != nil { + return nil, result, err + } + klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace()) + return object, result, nil +} diff --git a/controllers/profiles/common/knative.go b/controllers/profiles/common/knative.go new file mode 100644 index 000000000..13f5bc49c --- /dev/null +++ b/controllers/profiles/common/knative.go @@ -0,0 +1,76 @@ +// Copyright 2024 Apache Software Foundation (ASF) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "context" + + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative" + + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/log" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ KnativeEventingHandler = &knativeObjectManager{} + +type knativeObjectManager struct { + sinkBinding ObjectEnsurer + trigger ObjectsEnsurer + *StateSupport +} + +func NewKnativeEventingHandler(support *StateSupport) KnativeEventingHandler { + return &knativeObjectManager{ + sinkBinding: NewObjectEnsurer(support.C, SinkBindingCreator), + trigger: NewObjectsEnsurer(support.C, TriggersCreator), + StateSupport: support, + } +} + +type KnativeEventingHandler interface { + Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) +} + +func (k knativeObjectManager) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) { + var objs []client.Object + + if workflow.Spec.Flow.Events == nil { + // skip if no event is found + klog.V(log.I).InfoS("skip knative resource creation as no event is found") + } else if workflow.Spec.Sink == nil { + klog.V(log.I).InfoS("Spec.Sink is not provided") + } else if knativeAvail, err := knative.GetKnativeAvailability(k.Cfg); err != nil || knativeAvail == nil || !knativeAvail.Eventing { + klog.V(log.I).InfoS("Knative Eventing is not installed") + } else { + // create sinkBinding and trigger + sinkBinding, _, err := k.sinkBinding.Ensure(ctx, workflow) + if err != nil { + return objs, err + } else if sinkBinding != nil { + objs = append(objs, sinkBinding) + } + + triggers := k.trigger.Ensure(ctx, workflow) + for _, trigger := range triggers { + if trigger.Error != nil { + return objs, trigger.Error + } + objs = append(objs, trigger.Object) + } + } + return objs, nil +} diff --git a/controllers/profiles/common/object_creators.go b/controllers/profiles/common/object_creators.go index 7e08366b1..4b28e9806 100644 --- a/controllers/profiles/common/object_creators.go +++ b/controllers/profiles/common/object_creators.go @@ -20,10 +20,21 @@ package common import ( + "fmt" + "strings" + + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef" + + cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model" + "github.com/imdario/mergo" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/tracker" "sigs.k8s.io/controller-runtime/pkg/client" operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" @@ -45,6 +56,9 @@ type ObjectCreator func(workflow *operatorapi.SonataFlow) (client.Object, error) // SonataFlowPlatform type ObjectCreatorWithPlatform func(workflow *operatorapi.SonataFlow, platform *operatorapi.SonataFlowPlatform) (client.Object, error) +// ObjectsCreator creates multiple resources +type ObjectsCreator func(workflow *operatorapi.SonataFlow) ([]client.Object, error) + const ( defaultHTTPServicePort = 80 @@ -199,6 +213,85 @@ func ServiceCreator(workflow *operatorapi.SonataFlow) (client.Object, error) { return service, nil } +// SinkBindingCreator is an ObjectsCreator for SinkBinding. +// It will create v1.SinkBinding based on events defined in workflow. +func SinkBindingCreator(workflow *operatorapi.SonataFlow) (client.Object, error) { + lbl := workflowproj.GetDefaultLabels(workflow) + + // skip if no produced event is found + if workflow.Spec.Sink == nil || !workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) { + return nil, nil + } + + sink := workflow.Spec.Sink + + // subject must be deployment to inject K_SINK, service won't work + sinkBinding := &sourcesv1.SinkBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: strings.ToLower(fmt.Sprintf("%s-sb", workflow.Name)), + Namespace: workflow.Namespace, + Labels: lbl, + }, + Spec: sourcesv1.SinkBindingSpec{ + SourceSpec: duckv1.SourceSpec{ + Sink: *sink, + }, + BindingSpec: duckv1.BindingSpec{ + Subject: tracker.Reference{ + Name: workflow.Name, + Namespace: workflow.Namespace, + APIVersion: "apps/v1", + Kind: "Deployment", + }, + }, + }, + } + return sinkBinding, nil +} + +// TriggersCreator is an ObjectsCreator for Triggers. +// It will create a list of eventingv1.Trigger based on events defined in workflow. +func TriggersCreator(workflow *operatorapi.SonataFlow) ([]client.Object, error) { + var resultObjects []client.Object + lbl := workflowproj.GetDefaultLabels(workflow) + + //consumed + events := workflow.Spec.Flow.Events + for _, event := range events { + // filter out produce events + if event.Kind == cncfmodel.EventKindProduced { + continue + } + + // construct eventingv1.Trigger + trigger := &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: strings.ToLower(fmt.Sprintf("%s-%s-trigger", workflow.Name, event.Name)), + Namespace: workflow.Namespace, + Labels: lbl, + }, + Spec: eventingv1.TriggerSpec{ + Broker: constants.KnativeEventingBrokerDefault, + Filter: &eventingv1.TriggerFilter{ + Attributes: eventingv1.TriggerFilterAttributes{ + "type": event.Type, + }, + }, + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Name: workflow.Name, + Namespace: workflow.Namespace, + APIVersion: "v1", + Kind: "Service", + }, + }, + }, + } + resultObjects = append(resultObjects, trigger) + } + return resultObjects, nil +} + // OpenShiftRouteCreator is an ObjectCreator for a basic Route for a workflow running on OpenShift. // It enables the exposition of the service using an OpenShift Route. // See: https://github.com/openshift/api/blob/d170fcdc0fa638b664e4f35f2daf753cb4afe36b/route/v1/route.crd.yaml diff --git a/controllers/profiles/common/object_creators_test.go b/controllers/profiles/common/object_creators_test.go index beb04369e..236aa025c 100644 --- a/controllers/profiles/common/object_creators_test.go +++ b/controllers/profiles/common/object_creators_test.go @@ -23,6 +23,8 @@ import ( "context" "testing" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + "github.com/magiconair/properties" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" @@ -176,6 +178,35 @@ func TestMergePodSpec_OverrideContainers(t *testing.T) { assert.Empty(t, flowContainer.Env) } +func Test_ensureWorkflowSinkBindingIsCreated(t *testing.T) { + workflow := test.GetVetEventSonataFlow(t.Name()) + + //On Kubernetes we want the service exposed in Dev with NodePort + sinkBinding, _ := SinkBindingCreator(workflow) + sinkBinding.SetUID("1") + sinkBinding.SetResourceVersion("1") + + reflectSinkBinding := sinkBinding.(*sourcesv1.SinkBinding) + + assert.NotNil(t, reflectSinkBinding) + assert.NotNil(t, reflectSinkBinding.Spec) + assert.NotEmpty(t, reflectSinkBinding.Spec.Sink) + assert.Equal(t, reflectSinkBinding.Spec.Sink.Ref.Kind, "Broker") +} + +func Test_ensureWorkflowTriggersAreCreated(t *testing.T) { + workflow := test.GetVetEventSonataFlow(t.Name()) + + //On Kubernetes we want the service exposed in Dev with NodePort + triggers, _ := TriggersCreator(workflow) + + assert.NotEmpty(t, triggers) + assert.Len(t, triggers, 2) + for _, trigger := range triggers { + assert.Contains(t, []string{"vet-vetappointmentrequestreceived-trigger", "vet-vetappointmentinfo-trigger"}, trigger.GetName()) + } +} + func TestMergePodSpec_WithPostgreSQL_and_JDBC_URL_field(t *testing.T) { workflow := test.GetBaseSonataFlow(t.Name()) workflow.Spec = v1alpha08.SonataFlowSpec{ diff --git a/controllers/profiles/common/properties/application.go b/controllers/profiles/common/properties/application.go index 414891405..9b5926085 100644 --- a/controllers/profiles/common/properties/application.go +++ b/controllers/profiles/common/properties/application.go @@ -40,9 +40,6 @@ import ( var ( immutableApplicationProperties = fmt.Sprintf("quarkus.http.port=%d\n"+ "quarkus.http.host=0.0.0.0\n"+ - // We disable the Knative health checks to not block the pod to run if Knative objects are not available - // See: https://kiegroup.github.io/kogito-docs/serverlessworkflow/latest/eventing/consume-produce-events-with-knative-eventing.html#ref-knative-eventing-add-on-source-configuration - "org.kie.kogito.addons.knative.eventing.health-enabled=false\n"+ "quarkus.devservices.enabled=false\n"+ "quarkus.kogito.devservices.enabled=false\n", constants.DefaultHTTPWorkflowPortInt) _ AppPropertyHandler = &appPropertyHandler{} @@ -160,6 +157,11 @@ func NewAppPropertyHandler(workflow *operatorapi.SonataFlow, platform *operatora return nil, err } props.Merge(p) + p, err = generateKnativeEventingWorkflowProperties(workflow) + if err != nil { + return nil, err + } + props.Merge(p) props.Sort() } handler.defaultMutableProperties = props diff --git a/controllers/profiles/common/properties/application_test.go b/controllers/profiles/common/properties/application_test.go index e686aaac6..7133db17e 100644 --- a/controllers/profiles/common/properties/application_test.go +++ b/controllers/profiles/common/properties/application_test.go @@ -124,13 +124,12 @@ func Test_appPropertyHandler_WithUserPropertiesWithNoUserOverrides(t *testing.T) assert.NoError(t, err) generatedProps, propsErr := properties.LoadString(props.WithUserProperties(userProperties).Build()) assert.NoError(t, propsErr) - assert.Equal(t, 7, len(generatedProps.Keys())) + assert.Equal(t, 6, len(generatedProps.Keys())) assert.NotContains(t, "property1", generatedProps.Keys()) assert.NotContains(t, "property2", generatedProps.Keys()) assert.Equal(t, "http://greeting.default", generatedProps.GetString("kogito.service.url", "")) assert.Equal(t, "8080", generatedProps.GetString("quarkus.http.port", "")) assert.Equal(t, "0.0.0.0", generatedProps.GetString("quarkus.http.host", "")) - assert.Equal(t, "false", generatedProps.GetString("org.kie.kogito.addons.knative.eventing.health-enabled", "")) assert.Equal(t, "false", generatedProps.GetString("quarkus.devservices.enabled", "")) assert.Equal(t, "false", generatedProps.GetString("quarkus.kogito.devservices.enabled", "")) assert.Equal(t, "false", generatedProps.GetString(constants.KogitoUserTasksEventsEnabled, "")) @@ -157,7 +156,7 @@ func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T Build()) generatedProps.DisableExpansion = true assert.NoError(t, propsErr) - assert.Equal(t, 21, len(generatedProps.Keys())) + assert.Equal(t, 20, len(generatedProps.Keys())) assert.NotContains(t, "property1", generatedProps.Keys()) assert.NotContains(t, "property2", generatedProps.Keys()) assertHasProperty(t, generatedProps, "service1", myService1Address) @@ -181,7 +180,6 @@ func Test_appPropertyHandler_WithUserPropertiesWithServiceDiscovery(t *testing.T assertHasProperty(t, generatedProps, "kogito.service.url", fmt.Sprintf("http://greeting.%s", defaultNamespace)) assertHasProperty(t, generatedProps, "quarkus.http.port", "8080") assertHasProperty(t, generatedProps, "quarkus.http.host", "0.0.0.0") - assertHasProperty(t, generatedProps, "org.kie.kogito.addons.knative.eventing.health-enabled", "false") assertHasProperty(t, generatedProps, "quarkus.devservices.enabled", "false") assertHasProperty(t, generatedProps, "quarkus.kogito.devservices.enabled", "false") assertHasProperty(t, generatedProps, constants.KogitoUserTasksEventsEnabled, "false") @@ -276,7 +274,6 @@ func Test_appPropertyHandler_WithServicesWithUserOverrides(t *testing.T) { assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace+"/v2/jobs/events", generatedProps.GetString(constants.JobServiceRequestEventsURL, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEvents, "")) assert.Equal(t, "", generatedProps.GetString(constants.JobServiceStatusChangeEventsURL, "")) - assert.Equal(t, "http://"+platform.Name+"-"+constants.JobServiceName+"."+platform.Namespace, generatedProps.GetString(constants.KogitoJobServiceURL, "")) // disabling job service bypasses config of outgoing events url platform.Spec.Services.JobService.Enabled = nil diff --git a/controllers/profiles/common/properties/properties.go b/controllers/profiles/common/properties/properties.go new file mode 100644 index 000000000..ec4c3b098 --- /dev/null +++ b/controllers/profiles/common/properties/properties.go @@ -0,0 +1,49 @@ +// Copyright 2024 Apache Software Foundation (ASF) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package properties + +import ( + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/profiles/common/constants" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef" + "github.com/magiconair/properties" + cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model" +) + +// GenerateKnativeEventingWorkflowProperties returns the set of application properties required for the workflow to produce or consume +// Knative Events. +// Never nil. +func generateKnativeEventingWorkflowProperties(workflow *operatorapi.SonataFlow) (*properties.Properties, error) { + props := properties.NewProperties() + if workflow == nil || workflow.Spec.Sink == nil { + props.Set(constants.KnativeHealthEnabled, "false") + return props, nil + } + // verify ${K_SINK} + props.Set(constants.KnativeHealthEnabled, "true") + if workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindProduced) { + props.Set(constants.KogitoOutgoingEventsConnector, constants.QuarkusHTTP) + props.Set(constants.KogitoOutgoingEventsURL, constants.KnativeInjectedEnvVar) + } + if workflowdef.ContainsEventKind(workflow, cncfmodel.EventKindConsumed) { + props.Set(constants.KogitoIncomingEventsConnector, constants.QuarkusHTTP) + var path = "/" + if workflow.Spec.Sink.URI != nil { + path = workflow.Spec.Sink.URI.Path + } + props.Set(constants.KogitoIncomingEventsPath, path) + } + return props, nil +} diff --git a/controllers/profiles/common/reconciler.go b/controllers/profiles/common/reconciler.go index da29a10a7..b2800005b 100644 --- a/controllers/profiles/common/reconciler.go +++ b/controllers/profiles/common/reconciler.go @@ -23,6 +23,8 @@ import ( "context" "fmt" + "k8s.io/client-go/rest" + "github.com/apache/incubator-kie-kogito-serverless-operator/controllers/discovery" "k8s.io/client-go/tools/record" @@ -39,12 +41,13 @@ import ( // StateSupport is the shared structure with common accessors used throughout the whole reconciliation profiles type StateSupport struct { C client.Client + Cfg *rest.Config Catalog discovery.ServiceCatalog Recorder record.EventRecorder } // PerformStatusUpdate updates the SonataFlow Status conditions -func (s StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operatorapi.SonataFlow) (bool, error) { +func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operatorapi.SonataFlow) (bool, error) { var err error workflow.Status.ObservedGeneration = workflow.Generation if err = s.C.Status().Update(ctx, workflow); err != nil { diff --git a/controllers/profiles/dev/profile_dev.go b/controllers/profiles/dev/profile_dev.go index 14dc188ce..0bd5a3970 100644 --- a/controllers/profiles/dev/profile_dev.go +++ b/controllers/profiles/dev/profile_dev.go @@ -46,6 +46,7 @@ func (d developmentProfile) GetProfile() metadata.ProfileType { func NewProfileReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler { support := &common.StateSupport{ C: client, + Cfg: cfg, Catalog: discovery.NewServiceCatalogForConfig(client, cfg), Recorder: recorder, } diff --git a/controllers/profiles/dev/states_dev.go b/controllers/profiles/dev/states_dev.go index e186a3589..17bd5a80a 100644 --- a/controllers/profiles/dev/states_dev.go +++ b/controllers/profiles/dev/states_dev.go @@ -117,6 +117,12 @@ func (e *ensureRunningWorkflowState) Do(ctx context.Context, workflow *operatora } objs = append(objs, route) + if knativeObjs, err := common.NewKnativeEventingHandler(e.StateSupport).Ensure(ctx, workflow); err != nil { + return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, objs, err + } else { + objs = append(objs, knativeObjs...) + } + // First time reconciling this object, mark as wait for deployment if workflow.Status.GetTopLevelCondition().IsUnknown() { klog.V(log.I).InfoS("Workflow is in WaitingForDeployment Condition") diff --git a/controllers/profiles/prod/deployment_handler.go b/controllers/profiles/prod/deployment_handler.go index 4581f2cce..af999e5aa 100644 --- a/controllers/profiles/prod/deployment_handler.go +++ b/controllers/profiles/prod/deployment_handler.go @@ -83,7 +83,12 @@ func (d *deploymentReconciler) reconcileWithBuiltImage(ctx context.Context, work return reconcile.Result{}, nil, err } + knativeObjs, err := common.NewKnativeEventingHandler(d.StateSupport).Ensure(ctx, workflow) + if err != nil { + return ctrl.Result{RequeueAfter: constants.RequeueAfterFailure}, nil, err + } objs := []client.Object{deployment, service, managedPropsCM} + objs = append(objs, knativeObjs...) if deploymentOp == controllerutil.OperationResultCreated { workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForDeploymentReason, "") diff --git a/controllers/profiles/prod/profile_prod.go b/controllers/profiles/prod/profile_prod.go index 33932d23d..b33532685 100644 --- a/controllers/profiles/prod/profile_prod.go +++ b/controllers/profiles/prod/profile_prod.go @@ -72,6 +72,7 @@ func newObjectEnsurers(support *common.StateSupport) *objectEnsurers { func NewProfileReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler { support := &common.StateSupport{ C: client, + Cfg: cfg, Catalog: discovery.NewServiceCatalogForConfig(client, cfg), Recorder: recorder, } @@ -93,6 +94,7 @@ func NewProfileReconciler(client client.Client, cfg *rest.Config, recorder recor func NewProfileForOpsReconciler(client client.Client, cfg *rest.Config, recorder record.EventRecorder) profiles.ProfileReconciler { support := &common.StateSupport{ C: client, + Cfg: cfg, Catalog: discovery.NewServiceCatalogForConfig(client, cfg), Recorder: recorder, } diff --git a/controllers/workflowdef/services.go b/controllers/workflowdef/services.go new file mode 100644 index 000000000..03db8ef02 --- /dev/null +++ b/controllers/workflowdef/services.go @@ -0,0 +1,29 @@ +// Copyright 2024 Apache Software Foundation (ASF) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workflowdef + +import ( + operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08" + cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model" +) + +func ContainsEventKind(workflow *operatorapi.SonataFlow, eventKind cncfmodel.EventKind) bool { + for _, event := range workflow.Spec.Flow.Events { + if event.Kind == eventKind { + return true + } + } + return false +} diff --git a/main.go b/main.go index a1040f7bf..240a1d2fd 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,9 @@ import ( "flag" "os" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + "k8s.io/klog/v2/klogr" "k8s.io/klog/v2" @@ -54,6 +57,8 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(operatorapi.AddToScheme(scheme)) + utilruntime.Must(sourcesv1.AddToScheme(scheme)) + utilruntime.Must(eventingv1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } diff --git a/operator.yaml b/operator.yaml index a4693bd67..fb73c1011 100644 --- a/operator.yaml +++ b/operator.yaml @@ -26173,6 +26173,53 @@ spec: type: object type: array type: object + sink: + description: Sink describes the sinkBinding details of this SonataFlow + instance. + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates + in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + If set, these CAs are appended to the set of CAs provided by + the Addressable target, if any. + type: string + ref: + description: Ref points to an Addressable. + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of the + group. This can be used as an alternative to the APIVersion, + and then resolved using ResolveGroup. Note: This API is + EXPERIMENTAL and might break anytime. For more details: + https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ + This is optional field, it gets defaulted to the object + holding it if left out.' + type: string + required: + - kind + - name + type: object + uri: + description: URI can be an absolute URL(non-empty scheme and non-empty + host) pointing to the target or a relative URI. Relative URIs + will be resolved using the base URI retrieved from Ref. + type: string + type: object required: - flow type: object @@ -26354,6 +26401,36 @@ rules: - patch - update - watch +- apiGroups: + - eventing.knative.dev + resources: + - triggers + - triggers/status + - triggers/finalizers + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch +- apiGroups: + - sources.knative.dev + resources: + - sinkbindings + - sinkbindings/status + - sinkbindings/finalizers + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml b/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml new file mode 100644 index 000000000..05e3a5b42 --- /dev/null +++ b/test/testdata/sonataflow.org_v1alpha08_sonataflow_vet_event.yaml @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +apiVersion: sonataflow.org/v1alpha08 +kind: SonataFlow +metadata: + name: vet + annotations: + sonataflow.org/description: Vet service call via events + sonataflow.org/version: 0.0.1 +spec: + sink: + ref: + name: default + namespace: default + apiVersion: eventing.knative.dev/v1 + kind: Broker + flow: + events: + - name: MakeVetAppointment + source: VetServiceSource + type: events.vet.appointments + kind: produced + - name: VetAppointmentInfo + source: VetServiceSource + type: events.vet.appointments + - name: VetAppointmentRequestReceived + source: checkAccountInfo + type: events.vet.appointments.request + functions: + - name: StoreNewPatientInfo + operation: specs/services.yaml#checkAccountInfo + states: + - name: AppointmentRequestReceived + type: event + onEvents: + - eventRefs: + - VetAppointmentRequestReceived + actions: + - name: checkAccount + functionRef: + refName: checkAccountInfo + arguments: + account: "${ .accountId }" + transition: MakeVetAppointmentState + - name: MakeVetAppointmentState + type: callback + action: + name: MakeAppointmentAction + eventRef: + triggerEventRef: MakeVetAppointment + data: "${ .patientInfo }" + eventRef: VetAppointmentInfo + timeouts: + stateExecTimeout: PT15M + eventDataFilter: + toStateData: .test + end: true diff --git a/test/yaml.go b/test/yaml.go index 5638e53a1..8c3692a6f 100644 --- a/test/yaml.go +++ b/test/yaml.go @@ -44,6 +44,7 @@ const ( SonataFlowGreetingsWithDataInputSchemaCR = "sonataflow.org_v1alpha08_sonataflow_greetings_datainput.yaml" SonataFlowGreetingsWithStaticResourcesCR = "sonataflow.org_v1alpha08_sonataflow-metainf.yaml" SonataFlowSimpleOpsYamlCR = "sonataflow.org_v1alpha08_sonataflow-simpleops.yaml" + SonataFlowVetWithEventCR = "sonataflow.org_v1alpha08_sonataflow_vet_event.yaml" SonataFlowGreetingsDataInputSchemaConfig = "v1_configmap_greetings_datainput.yaml" SonataFlowGreetingsStaticFilesConfig = "v1_configmap_greetings_staticfiles.yaml" sonataFlowPlatformYamlCR = "sonataflow.org_v1alpha08_sonataflowplatform.yaml" @@ -202,6 +203,10 @@ func GetBaseSonataFlow(namespace string, options ...*func(*operatorapi.SonataFlo return NewSonataFlow(sonataFlowSampleYamlCR, namespace) } +func GetVetEventSonataFlow(namespace string) *operatorapi.SonataFlow { + return GetSonataFlow(SonataFlowVetWithEventCR, namespace) +} + func GetBaseSonataFlowWithDevProfile(namespace string) *operatorapi.SonataFlow { return NewSonataFlow(sonataFlowSampleYamlCR, namespace, SetDevProfile) }