Skip to content

Commit

Permalink
[KOGITO-7754] create Knative eventing resources (#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardW98 committed Feb 28, 2024
1 parent a48a696 commit 76ef902
Show file tree
Hide file tree
Showing 25 changed files with 699 additions and 23 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ type SonataFlowSpec struct {
PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"`
// Persistence defines the database persistence configuration for the workflow
Persistence *PersistenceOptionsSpec `json:"persistence,omitempty"`
// Sink describes the sinkBinding details of this SonataFlow instance.
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sink"
Sink *duckv1.Destination `json:"sink,omitempty"`
}

// SonataFlowStatus defines the observed state of SonataFlow
Expand Down
6 changes: 6 additions & 0 deletions api/v1alpha08/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ spec:
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
- description: Sink describes the sinkBinding details of this SonataFlow instance.
displayName: sink
path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
Expand Down Expand Up @@ -400,6 +403,36 @@ spec:
- patch
- update
- watch
- apiGroups:
- eventing.knative.dev
resources:
- triggers
- triggers/status
- triggers/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- sources.knative.dev
resources:
- sinkbindings
- sinkbindings/status
- sinkbindings/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- coordination.k8s.io
resources:
Expand Down
47 changes: 47 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9346,6 +9346,53 @@ spec:
type: object
type: array
type: object
sink:
description: Sink describes the sinkBinding details of this SonataFlow
instance.
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates
in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
If set, these CAs are appended to the set of CAs provided by
the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the
group. This can be used as an alternative to the APIVersion,
and then resolved using ResolveGroup. Note: This API is
EXPERIMENTAL and might break anytime. For more details:
https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
This is optional field, it gets defaulted to the object
holding it if left out.'
type: string
required:
- kind
- name
type: object
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty
host) pointing to the target or a relative URI. Relative URIs
will be resolved using the base URI retrieved from Ref.
type: string
type: object
required:
- flow
type: object
Expand Down
47 changes: 47 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9347,6 +9347,53 @@ spec:
type: object
type: array
type: object
sink:
description: Sink describes the sinkBinding details of this SonataFlow
instance.
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates
in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
If set, these CAs are appended to the set of CAs provided by
the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the
group. This can be used as an alternative to the APIVersion,
and then resolved using ResolveGroup. Note: This API is
EXPERIMENTAL and might break anytime. For more details:
https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
This is optional field, it gets defaulted to the object
holding it if left out.'
type: string
required:
- kind
- name
type: object
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty
host) pointing to the target or a relative URI. Relative URIs
will be resolved using the base URI retrieved from Ref.
type: string
type: object
required:
- flow
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ spec:
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
- description: Sink describes the sinkBinding details of this SonataFlow instance.
displayName: sink
path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
Expand Down
30 changes: 30 additions & 0 deletions config/rbac/builder_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,36 @@ rules:
resources:
- roles
- rolebindings
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- eventing.knative.dev
resources:
- triggers
- triggers/status
- triggers/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- sources.knative.dev
resources:
- sinkbindings
- sinkbindings/status
- sinkbindings/finalizers
verbs:
- create
- delete
Expand Down
7 changes: 7 additions & 0 deletions controllers/profiles/common/constants/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ package constants

const (
MicroprofileServiceCatalogPropertyPrefix = "org.kie.kogito.addons.discovery."
KogitoOutgoingEventsURL = "mp.messaging.outgoing.kogito_outgoing_stream.url"
KogitoOutgoingEventsConnector = "mp.messaging.outgoing.kogito_outgoing_stream.connector"
KogitoIncomingEventsConnector = "mp.messaging.incoming.kogito_incoming_stream.connector"
KogitoIncomingEventsPath = "mp.messaging.incoming.kogito_incoming_stream.path"
KnativeHealthEnabled = "org.kie.kogito.addons.knative.eventing.health-enabled"
KnativeInjectedEnvVar = "${K_SINK}"
KnativeEventingBrokerDefault = "default"
)
75 changes: 61 additions & 14 deletions controllers/profiles/common/ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

var _ ObjectEnsurer = &defaultObjectEnsurer{}
var _ ObjectEnsurer = &noopObjectEnsurer{}
var _ ObjectsEnsurer = &defaultObjectsEnsurer{}

type ObjectEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
Expand Down Expand Up @@ -77,22 +78,10 @@ func (d *defaultObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi
result := controllerutil.OperationResultNone

object, err := d.creator(workflow)
if err != nil {
return nil, result, err
}
if result, err = controllerutil.CreateOrPatch(ctx, d.c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, d.c.Scheme())
}); err != nil {
if err != nil || object == nil {
return nil, result, err
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
return ensureObject(ctx, workflow, visitors, result, d.c, object)
}

// defaultObjectEnsurerWithPlatform is the equivalent of defaultObjectEnsurer for resources that require a reference to the SonataFlowPlatform
Expand Down Expand Up @@ -136,3 +125,61 @@ func (d *noopObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.So
result := controllerutil.OperationResultNone
return nil, result, nil
}

// ObjectsEnsurer is an ensurer to apply multiple objects
type ObjectsEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult
}

type ObjectEnsurerResult struct {
client.Object
Result controllerutil.OperationResult
Error error
}

func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) ObjectsEnsurer {
return &defaultObjectsEnsurer{
c: client,
creator: creator,
}
}

type defaultObjectsEnsurer struct {
ObjectsEnsurer
c client.Client
creator ObjectsCreator
}

func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult {
result := controllerutil.OperationResultNone

objects, err := d.creator(workflow)
if err != nil {
return []ObjectEnsurerResult{{nil, result, err}}
}
var ensureResult []ObjectEnsurerResult
for _, object := range objects {
ensureObject, c, err := ensureObject(ctx, workflow, visitors, result, d.c, object)
ensureResult = append(ensureResult, ObjectEnsurerResult{ensureObject, c, err})
if err != nil {
return ensureResult
}
}
return ensureResult
}

func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitors []MutateVisitor, result controllerutil.OperationResult, c client.Client, object client.Object) (client.Object, controllerutil.OperationResult, error) {
if result, err := controllerutil.CreateOrPatch(ctx, c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, c.Scheme())
}); err != nil {
return nil, result, err
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
}
Loading

0 comments on commit 76ef902

Please sign in to comment.