Skip to content

Commit

Permalink
[KOGITO-7754] create Knative eventing resources
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardW98 committed Jan 18, 2024
1 parent 64f688b commit 189ff49
Show file tree
Hide file tree
Showing 28 changed files with 1,037 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ docker-buildx: test ## Build and push docker image for the manager for cross-pla
sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross
- docker buildx create --name project-v3-builder
docker buildx use project-v3-builder
- docker buildx build --push --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile.cross
- docker buildx build --platform=$(PLATFORMS) --push . -f Dockerfile.cross --tag ${IMG}
- docker buildx rm project-v3-builder
rm Dockerfile.cross

Expand Down
3 changes: 3 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,9 @@ type SonataFlowSpec struct {
// PodTemplate describes the deployment details of this SonataFlow instance.
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="podTemplate"
PodTemplate PodTemplateSpec `json:"podTemplate,omitempty"`
// Sink describes the sinkBinding details of this SonataFlow instance.
//+operator-sdk:csv:customresourcedefinitions:type=spec,displayName="sink"
Sink *duckv1.Destination `json:"sink,omitempty"`
}

// SonataFlowStatus defines the observed state of SonataFlow
Expand Down
6 changes: 6 additions & 0 deletions api/v1alpha08/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ spec:
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
- description: Sink describes the sinkBinding details of this SonataFlow instance.
displayName: sink
path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
Expand Down Expand Up @@ -347,6 +350,36 @@ spec:
- patch
- update
- watch
- apiGroups:
- eventing.knative.dev
resources:
- triggers
- triggers/status
- triggers/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- sources.knative.dev
resources:
- sinkbindings
- sinkbindings/status
- sinkbindings/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- coordination.k8s.io
resources:
Expand Down
47 changes: 47 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9287,6 +9287,53 @@ spec:
type: object
type: array
type: object
sink:
description: Sink describes the sinkBinding details of this SonataFlow
instance.
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates
in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
If set, these CAs are appended to the set of CAs provided by
the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the
group. This can be used as an alternative to the APIVersion,
and then resolved using ResolveGroup. Note: This API is
EXPERIMENTAL and might break anytime. For more details:
https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
This is optional field, it gets defaulted to the object
holding it if left out.'
type: string
required:
- kind
- name
type: object
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty
host) pointing to the target or a relative URI. Relative URIs
will be resolved using the base URI retrieved from Ref.
type: string
type: object
required:
- flow
type: object
Expand Down
47 changes: 47 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9288,6 +9288,53 @@ spec:
type: object
type: array
type: object
sink:
description: Sink describes the sinkBinding details of this SonataFlow
instance.
properties:
CACerts:
description: CACerts are Certification Authority (CA) certificates
in PEM format according to https://www.rfc-editor.org/rfc/rfc7468.
If set, these CAs are appended to the set of CAs provided by
the Addressable target, if any.
type: string
ref:
description: Ref points to an Addressable.
properties:
address:
description: Address points to a specific Address Name.
type: string
apiVersion:
description: API version of the referent.
type: string
group:
description: 'Group of the API, without the version of the
group. This can be used as an alternative to the APIVersion,
and then resolved using ResolveGroup. Note: This API is
EXPERIMENTAL and might break anytime. For more details:
https://github.com/knative/eventing/issues/5086'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
This is optional field, it gets defaulted to the object
holding it if left out.'
type: string
required:
- kind
- name
type: object
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty
host) pointing to the target or a relative URI. Relative URIs
will be resolved using the base URI retrieved from Ref.
type: string
type: object
required:
- flow
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ spec:
definition. For example, a collection of OpenAPI specification files.
displayName: resources
path: resources
- description: Sink describes the sinkBinding details of this SonataFlow instance.
displayName: sink
path: sink
statusDescriptors:
- description: Address is used as a part of Addressable interface (status.address.url)
for knative
Expand Down
30 changes: 30 additions & 0 deletions config/rbac/builder_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,36 @@ rules:
resources:
- roles
- rolebindings
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- eventing.knative.dev
resources:
- triggers
- triggers/status
- triggers/finalizers
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
- apiGroups:
- sources.knative.dev
resources:
- sinkbindings
- sinkbindings/status
- sinkbindings/finalizers
verbs:
- create
- delete
Expand Down
16 changes: 8 additions & 8 deletions controllers/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,33 +80,33 @@ type ServiceCatalog interface {
Query(ctx context.Context, uri ResourceUri, outputFormat string) (string, error)
}

type sonataFlowServiceCatalog struct {
type SonataFlowServiceCatalog struct {
kubernetesCatalog ServiceCatalog
knativeCatalog ServiceCatalog
KnativeCatalog ServiceCatalog
openshiftCatalog ServiceCatalog
}

// NewServiceCatalog returns a new ServiceCatalog configured to resolve kubernetes, knative, and openshift resource addresses.
func NewServiceCatalog(cli client.Client, knDiscoveryClient *KnDiscoveryClient) ServiceCatalog {
return &sonataFlowServiceCatalog{
return &SonataFlowServiceCatalog{
kubernetesCatalog: newK8SServiceCatalog(cli),
knativeCatalog: newKnServiceCatalog(knDiscoveryClient),
KnativeCatalog: newKnServiceCatalog(knDiscoveryClient),
}
}

func NewServiceCatalogForConfig(cli client.Client, cfg *rest.Config) ServiceCatalog {
return &sonataFlowServiceCatalog{
return &SonataFlowServiceCatalog{
kubernetesCatalog: newK8SServiceCatalog(cli),
knativeCatalog: newKnServiceCatalogForConfig(cfg),
KnativeCatalog: newKnServiceCatalogForConfig(cfg),
}
}

func (c *sonataFlowServiceCatalog) Query(ctx context.Context, uri ResourceUri, outputFormat string) (string, error) {
func (c *SonataFlowServiceCatalog) Query(ctx context.Context, uri ResourceUri, outputFormat string) (string, error) {
switch uri.Scheme {
case KubernetesScheme:
return c.kubernetesCatalog.Query(ctx, uri, outputFormat)
case KnativeScheme:
return c.knativeCatalog.Query(ctx, uri, outputFormat)
return c.KnativeCatalog.Query(ctx, uri, outputFormat)
case OpenshiftScheme:
return "", fmt.Errorf("openshift service discovery is not yet implemented")
default:
Expand Down
6 changes: 6 additions & 0 deletions controllers/profiles/common/constants/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,10 @@ package constants

const (
MicroprofileServiceCatalogPropertyPrefix = "org.kie.kogito.addons.discovery."
OutgoingEventsURL = "mp.messaging.outgoing.kogito_outgoing_stream.url"
OutgoingEventsConnector = "mp.messaging.outgoing.kogito_outgoing_stream.connector"
IncomingEventsConnector = "mp.messaging.incoming.kogito_incoming_stream.connector"
IncomingEventsPath = "mp.messaging.incoming.kogito_incoming_stream.path"
KnativeInjectedEnvVar = "${K_SINK}"
KnativeEventingBrokerDefault = "default"
)
75 changes: 61 additions & 14 deletions controllers/profiles/common/ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

var _ ObjectEnsurer = &defaultObjectEnsurer{}
var _ ObjectEnsurer = &noopObjectEnsurer{}
var _ ObjectsEnsurer = &defaultObjectsEnsurer{}

type ObjectEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) (client.Object, controllerutil.OperationResult, error)
Expand Down Expand Up @@ -66,22 +67,10 @@ func (d *defaultObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi
result := controllerutil.OperationResultNone

object, err := d.creator(workflow)
if err != nil {
if err != nil || object == nil {
return nil, result, err
}
if result, err = controllerutil.CreateOrPatch(ctx, d.c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, d.c.Scheme())
}); err != nil {
return nil, result, err
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
return ensureObject(ctx, workflow, visitors, result, err, d.c, object)
}

// NewNoopObjectEnsurer see noopObjectEnsurer
Expand All @@ -97,3 +86,61 @@ func (d *noopObjectEnsurer) Ensure(ctx context.Context, workflow *operatorapi.So
result := controllerutil.OperationResultNone
return nil, result, nil
}

// ObjectsEnsurer is an ensurer to apply multiple objects
type ObjectsEnsurer interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult
}

type ObjectEnsurerResult struct {
client.Object
Result controllerutil.OperationResult
Error error
}

func NewObjectsEnsurer(client client.Client, creator ObjectsCreator) ObjectsEnsurer {
return &defaultObjectsEnsurer{
c: client,
creator: creator,
}
}

type defaultObjectsEnsurer struct {
ObjectsEnsurer
c client.Client
creator ObjectsCreator
}

func (d *defaultObjectsEnsurer) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow, visitors ...MutateVisitor) []ObjectEnsurerResult {
result := controllerutil.OperationResultNone

objects, err := d.creator(workflow)
if err != nil {
return []ObjectEnsurerResult{{nil, result, err}}
}
var ensureResult []ObjectEnsurerResult
for _, object := range objects {
ensureObject, c, err := ensureObject(ctx, workflow, visitors, result, err, d.c, object)
ensureResult = append(ensureResult, ObjectEnsurerResult{ensureObject, c, err})
if err != nil {
return ensureResult
}
}
return ensureResult
}

func ensureObject(ctx context.Context, workflow *operatorapi.SonataFlow, visitors []MutateVisitor, result controllerutil.OperationResult, err error, c client.Client, object client.Object) (client.Object, controllerutil.OperationResult, error) {
if result, err = controllerutil.CreateOrPatch(ctx, c, object,
func() error {
for _, v := range visitors {
if visitorErr := v(object)(); visitorErr != nil {
return visitorErr
}
}
return controllerutil.SetControllerReference(workflow, object, c.Scheme())
}); err != nil {
return nil, result, err
}
klog.V(log.I).InfoS("Object operation finalized", "result", result, "kind", object.GetObjectKind().GroupVersionKind().String(), "name", object.GetName(), "namespace", object.GetNamespace())
return object, result, nil
}
Loading

0 comments on commit 189ff49

Please sign in to comment.