Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KOGITO-7754] create Knative eventing resources #350

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ type SonataFlowSpec struct {
PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"`
// Persistence defines the database persistence configuration for the workflow
Persistence *PersistenceOptionsSpec `json:"persistence,omitempty"`
// Sink describes the sinkBinding details of this SonataFlow instance.
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sink"
Sink *duckv1.Destination `json:"sink,omitempty"`
}

// SonataFlowStatus defines the observed state of SonataFlow
Expand Down
6 changes: 6 additions & 0 deletions api/v1alpha08/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ spec:
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
- description: Sink describes the sinkBinding details of this SonataFlow instance.
displayName: sink
path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
Expand Down Expand Up @@ -400,6 +403,36 @@ spec:
- patch
- update
- watch
- apiGroups:
- eventing.knative.dev
resources:
- triggers
- triggers/status
- triggers/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- sources.knative.dev
resources:
- sinkbindings
- sinkbindings/status
- sinkbindings/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- coordination.k8s.io
resources:
Expand Down
47 changes: 47 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9346,6 +9346,53 @@ spec:
type: object
type: array
type: object
sink:
description: Sink describes the sinkBinding details of this SonataFlow
instance.
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates
in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
If set, these CAs are appended to the set of CAs provided by
the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the
group. This can be used as an alternative to the APIVersion,
and then resolved using ResolveGroup. Note: This API is
EXPERIMENTAL and might break anytime. For more details:
https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
This is optional field, it gets defaulted to the object
holding it if left out.'
type: string
required:
- kind
- name
type: object
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty
host) pointing to the target or a relative URI. Relative URIs
will be resolved using the base URI retrieved from Ref.
type: string
type: object
required:
- flow
type: object
Expand Down
47 changes: 47 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9347,6 +9347,53 @@ spec:
type: object
type: array
type: object
sink:
description: Sink describes the sinkBinding details of this SonataFlow
instance.
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates
in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
If set, these CAs are appended to the set of CAs provided by
the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the
group. This can be used as an alternative to the APIVersion,
and then resolved using ResolveGroup. Note: This API is
EXPERIMENTAL and might break anytime. For more details:
https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
This is optional field, it gets defaulted to the object
holding it if left out.'
type: string
required:
- kind
- name
type: object
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty
host) pointing to the target or a relative URI. Relative URIs
will be resolved using the base URI retrieved from Ref.
type: string
type: object
required:
- flow
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ spec:
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
- description: Sink describes the sinkBinding details of this SonataFlow instance.
displayName: sink
path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
Expand Down
30 changes: 30 additions & 0 deletions config/rbac/builder_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,36 @@ rules:
resources:
- roles
- rolebindings
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- eventing.knative.dev
resources:
- triggers
- triggers/status
- triggers/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- sources.knative.dev
resources:
- sinkbindings
- sinkbindings/status
- sinkbindings/finalizers
verbs:
- create
- delete
Expand Down
7 changes: 7 additions & 0 deletions controllers/profiles/common/constants/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ package constants

const (
MicroprofileServiceCatalogPropertyPrefix = "org.kie.kogito.addons.discovery."
KogitoOutgoingEventsURL = "mp.messaging.outgoing.kogito_outgoing_stream.url"
KogitoOutgoingEventsConnector = "mp.messaging.outgoing.kogito_outgoing_stream.connector"
KogitoIncomingEventsConnector = "mp.messaging.incoming.kogito_incoming_stream.connector"
KogitoIncomingEventsPath = "mp.messaging.incoming.kogito_incoming_stream.path"
KnativeHealthEnabled = "org.kie.kogito.addons.knative.eventing.health-enabled"
KnativeInjectedEnvVar = "${K_SINK}"
KnativeEventingBrokerDefault = "default"
)
75 changes: 61 additions & 14 deletions controllers/profiles/common/ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

var _ ObjectEnsurer = &defaultObjectEnsurer{}
var _ ObjectEnsurer = &noopObjectEnsurer{}
var _ ObjectsEnsurer = &defaultObjectsEnsurer{}

type ObjectEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
Expand Down Expand Up @@ -77,22 +78,10 @@ func (d *defaultObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi
result := controllerutil.OperationResultNone

object, err := d.creator(workflow)
if err != nil {
return nil, result, err
}
if result, err = controllerutil.CreateOrPatch(ctx, d.c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, d.c.Scheme())
}); err != nil {
if err != nil || object == nil {
return nil, result, err
RichardW98 marked this conversation as resolved.
Show resolved Hide resolved
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
return ensureObject(ctx, workflow, visitors, result, d.c, object)
}

// defaultObjectEnsurerWithPlatform is the equivalent of defaultObjectEnsurer for resources that require a reference to the SonataFlowPlatform
Expand Down Expand Up @@ -136,3 +125,61 @@ func (d *noopObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.So
result := controllerutil.OperationResultNone
return nil, result, nil
}

// ObjectsEnsurer is an ensurer to apply multiple objects
type ObjectsEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult
}

type ObjectEnsurerResult struct {
client.Object
Result controllerutil.OperationResult
Error error
}

func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) ObjectsEnsurer {
return &defaultObjectsEnsurer{
c: client,
creator: creator,
}
}

type defaultObjectsEnsurer struct {
ObjectsEnsurer
c client.Client
creator ObjectsCreator
}

func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult {
result := controllerutil.OperationResultNone

objects, err := d.creator(workflow)
if err != nil {
return []ObjectEnsurerResult{{nil, result, err}}
}
var ensureResult []ObjectEnsurerResult
for _, object := range objects {
ensureObject, c, err := ensureObject(ctx, workflow, visitors, result, d.c, object)
ensureResult = append(ensureResult, ObjectEnsurerResult{ensureObject, c, err})
if err != nil {
return ensureResult
}
}
return ensureResult
}

func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitors []MutateVisitor, result controllerutil.OperationResult, c client.Client, object client.Object) (client.Object, controllerutil.OperationResult, error) {
if result, err := controllerutil.CreateOrPatch(ctx, c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, c.Scheme())
}); err != nil {
return nil, result, err
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
}
Loading
Loading