Skip to content

Commit

Permalink
[KOGITO-7754] create Knative eventing resources
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardW98 committed Jan 26, 2024
1 parent 2b5056d commit fcfaddc
Show file tree
Hide file tree
Showing 24 changed files with 706 additions and 22 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,9 @@ type SonataFlowSpec struct {
// Persists service to a datasource of choice. Ephemeral by default.
// +optional
Persistence *PersistenceOptions `json:"persistence,omitempty"`
// Sink describes the sinkBinding details of this SonataFlow instance.
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sink"
Sink *duckv1.Destination `json:"sink,omitempty"`
}

// SonataFlowStatus defines the observed state of SonataFlow
Expand Down
33 changes: 33 additions & 0 deletions bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ spec:
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
- description: Sink describes the sinkBinding details of this SonataFlow instance.
displayName: sink
path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
Expand Down Expand Up @@ -347,6 +350,36 @@ spec:
- patch
- update
- watch
- apiGroups:
- eventing.knative.dev
resources:
- triggers
- triggers/status
- triggers/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- sources.knative.dev
resources:
- sinkbindings
- sinkbindings/status
- sinkbindings/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- coordination.k8s.io
resources:
Expand Down
47 changes: 47 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9346,6 +9346,53 @@ spec:
type: object
type: array
type: object
sink:
description: Sink describes the sinkBinding details of this SonataFlow
instance.
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates
in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
If set, these CAs are appended to the set of CAs provided by
the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the
group. This can be used as an alternative to the APIVersion,
and then resolved using ResolveGroup. Note: This API is
EXPERIMENTAL and might break anytime. For more details:
https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
This is optional field, it gets defaulted to the object
holding it if left out.'
type: string
required:
- kind
- name
type: object
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty
host) pointing to the target or a relative URI. Relative URIs
will be resolved using the base URI retrieved from Ref.
type: string
type: object
required:
- flow
type: object
Expand Down
47 changes: 47 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9347,6 +9347,53 @@ spec:
type: object
type: array
type: object
sink:
description: Sink describes the sinkBinding details of this SonataFlow
instance.
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates
in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
If set, these CAs are appended to the set of CAs provided by
the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the
group. This can be used as an alternative to the APIVersion,
and then resolved using ResolveGroup. Note: This API is
EXPERIMENTAL and might break anytime. For more details:
https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
This is optional field, it gets defaulted to the object
holding it if left out.'
type: string
required:
- kind
- name
type: object
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty
host) pointing to the target or a relative URI. Relative URIs
will be resolved using the base URI retrieved from Ref.
type: string
type: object
required:
- flow
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ spec:
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
- description: Sink describes the sinkBinding details of this SonataFlow instance.
displayName: sink
path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
Expand Down
30 changes: 30 additions & 0 deletions config/rbac/builder_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,36 @@ rules:
resources:
- roles
- rolebindings
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- eventing.knative.dev
resources:
- triggers
- triggers/status
- triggers/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- sources.knative.dev
resources:
- sinkbindings
- sinkbindings/status
- sinkbindings/finalizers
verbs:
- create
- delete
Expand Down
7 changes: 7 additions & 0 deletions controllers/profiles/common/constants/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ package constants

const (
MicroprofileServiceCatalogPropertyPrefix = "org.kie.kogito.addons.discovery."
KogitoOutgoingEventsURL = "mp.messaging.outgoing.kogito_outgoing_stream.url"
KogitoOutgoingEventsConnector = "mp.messaging.outgoing.kogito_outgoing_stream.connector"
KogitoIncomingEventsConnector = "mp.messaging.incoming.kogito_incoming_stream.connector"
KogitoIncomingEventsPath = "mp.messaging.incoming.kogito_incoming_stream.path"
KnativeHealthEnabled = "org.kie.kogito.addons.knative.eventing.health-enabled"
KnativeInjectedEnvVar = "${K_SINK}"
KnativeEventingBrokerDefault = "default"
)
75 changes: 61 additions & 14 deletions controllers/profiles/common/ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

var _ ObjectEnsurer = &defaultObjectEnsurer{}
var _ ObjectEnsurer = &noopObjectEnsurer{}
var _ ObjectsEnsurer = &defaultObjectsEnsurer{}

type ObjectEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
Expand Down Expand Up @@ -66,22 +67,10 @@ func (d *defaultObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi
result := controllerutil.OperationResultNone

object, err := d.creator(workflow)
if err != nil {
if err != nil || object == nil {
return nil, result, err
}
if result, err = controllerutil.CreateOrPatch(ctx, d.c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, d.c.Scheme())
}); err != nil {
return nil, result, err
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
return ensureObject(ctx, workflow, visitors, result, d.c, object)
}

// NewNoopObjectEnsurer see noopObjectEnsurer
Expand All @@ -97,3 +86,61 @@ func (d *noopObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.So
result := controllerutil.OperationResultNone
return nil, result, nil
}

// ObjectsEnsurer is an ensurer to apply multiple objects
type ObjectsEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult
}

type ObjectEnsurerResult struct {
client.Object
Result controllerutil.OperationResult
Error error
}

func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) ObjectsEnsurer {
return &defaultObjectsEnsurer{
c: client,
creator: creator,
}
}

type defaultObjectsEnsurer struct {
ObjectsEnsurer
c client.Client
creator ObjectsCreator
}

func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult {
result := controllerutil.OperationResultNone

objects, err := d.creator(workflow)
if err != nil {
return []ObjectEnsurerResult{{nil, result, err}}
}
var ensureResult []ObjectEnsurerResult
for _, object := range objects {
ensureObject, c, err := ensureObject(ctx, workflow, visitors, result, d.c, object)
ensureResult = append(ensureResult, ObjectEnsurerResult{ensureObject, c, err})
if err != nil {
return ensureResult
}
}
return ensureResult
}

func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitors []MutateVisitor, result controllerutil.OperationResult, c client.Client, object client.Object) (client.Object, controllerutil.OperationResult, error) {
if result, err := controllerutil.CreateOrPatch(ctx, c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, c.Scheme())
}); err != nil {
return nil, result, err
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
}
76 changes: 76 additions & 0 deletions controllers/profiles/common/knative.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 Apache Software Foundation (ASF)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ KnativeEventingHandler = &knativeObjectManager{}

type knativeObjectManager struct {
sinkBinding ObjectEnsurer
trigger ObjectsEnsurer
*StateSupport
}

func NewKnativeEventingHandler(support *StateSupport) KnativeEventingHandler {
return &knativeObjectManager{
sinkBinding: NewObjectEnsurer(support.C, SinkBindingCreator),
trigger: NewObjectsEnsurer(support.C, TriggersCreator),
StateSupport: support,
}
}

type KnativeEventingHandler interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error)
}

func (k knativeObjectManager) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) {
var objs []client.Object

if workflow.Spec.Flow.Events == nil {
// skip if no event is found
klog.V(log.I).InfoS("skip knative resource creation as no event is found")
} else if workflow.Spec.Sink == nil {
klog.V(log.I).InfoS("Spec.Sink is not provided")
} else if knativeAvail, err := knative.GetKnativeAvailability(k.Cfg); err != nil || knativeAvail == nil || !knativeAvail.Eventing {
klog.V(log.I).InfoS("Knative Eventing is not installed")
} else {
// create sinkBinding and trigger
sinkBinding, _, err := k.sinkBinding.Ensure(ctx, workflow)
if err != nil {
return objs, err
} else if sinkBinding != nil {
objs = append(objs, sinkBinding)
}

triggers := k.trigger.Ensure(ctx, workflow)
for _, trigger := range triggers {
if trigger.Error != nil {
return objs, trigger.Error
}
objs = append(objs, trigger.Object)
}
}
return objs, nil
}
Loading

0 comments on commit fcfaddc

Please sign in to comment.