Skip to content

Commit

Permalink
kie-issues-314: Operator driven service discovery API Phase3
Browse files Browse the repository at this point in the history
kie-issues-314: Operator driven service discovery API Phase3
    - Adds the processing of workflow functions to generate the service discovery for knative functions invokations

kie-issues-314: Operator driven service discovery API Phase3
    - Code review suggestions 1

kie-issues-314: Operator driven service discovery API Phase3
    - Code review suggestions 2
  • Loading branch information
wmedvede committed Dec 21, 2023
1 parent 08c8854 commit 5733069
Show file tree
Hide file tree
Showing 39 changed files with 3,634 additions and 716 deletions.
36 changes: 36 additions & 0 deletions bundle/manifests/sonataflow-operator.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,42 @@ spec:
- subjectaccessreviews
verbs:
- create
- apiGroups:
- apps
resources:
- statefulset
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- networking.k8s.io
resources:
- ingress
- ingresses
verbs:
- get
- list
- watch
- apiGroups:
- serving.knative.dev
resources:
- service
- services
verbs:
- get
- list
- watch
- apiGroups:
- eventing.knative.dev
resources:
- broker
- brokers
verbs:
- get
- list
- watch
serviceAccountName: sonataflow-operator-controller-manager
deployments:
- label:
Expand Down
2 changes: 2 additions & 0 deletions config/rbac/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ resources:
- openshift_role_binding.yaml
- operator_role_leases.yaml
- operator_role_binding_leases.yaml
- service_discovery_role.yaml
- service_discovery_role_binding.yaml
# Comment the following 4 lines if you want to disable
# the auth proxy (https://github.com/brancz/kube-rbac-proxy)
# which protects your /metrics endpoint.
Expand Down
42 changes: 42 additions & 0 deletions config/rbac/service_discovery_role.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: service-discovery-role
rules:
- apiGroups:
- apps
resources:
- statefulset
- statefulsets
verbs:
- get
- list
- watch
- apiGroups:
- networking.k8s.io
resources:
- ingress
- ingresses
verbs:
- get
- list
- watch
- apiGroups:
- serving.knative.dev
resources:
- service
- services
verbs:
- get
- list
- watch
- apiGroups:
- eventing.knative.dev
resources:
- broker
- brokers
verbs:
- get
- list
- watch
14 changes: 14 additions & 0 deletions config/rbac/service_discovery_role_binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: service-discovery-rolebinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: service-discovery-role
subjects:
- kind: ServiceAccount
name: controller-manager
namespace: system

16 changes: 13 additions & 3 deletions controllers/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/rest"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -52,6 +54,7 @@ const (

// knative groups
knativeServices = "knative:services.v1.serving.knative.dev"
knativeBrokers = "knative:brokers.v1.eventing.knative.dev"

// openshift groups
openshiftRoutes = "openshift:routes.v1.route.openshift.io"
Expand Down Expand Up @@ -84,10 +87,17 @@ type sonataFlowServiceCatalog struct {
}

// NewServiceCatalog returns a new ServiceCatalog configured to resolve kubernetes, knative, and openshift resource addresses.
func NewServiceCatalog(cli client.Client) ServiceCatalog {
func NewServiceCatalog(cli client.Client, knDiscoveryClient *KnDiscoveryClient) ServiceCatalog {
return &sonataFlowServiceCatalog{
kubernetesCatalog: newK8SServiceCatalog(cli),
knativeCatalog: newKnServiceCatalog(knDiscoveryClient),
}
}

func NewServiceCatalogForConfig(cli client.Client, cfg *rest.Config) ServiceCatalog {
return &sonataFlowServiceCatalog{
kubernetesCatalog: newK8SServiceCatalog(cli),
knativeCatalog: newKnServiceCatalog(cli),
knativeCatalog: newKnServiceCatalogForConfig(cfg),
}
}

Expand All @@ -96,7 +106,7 @@ func (c *sonataFlowServiceCatalog) Query(ctx context.Context, uri ResourceUri, o
case KubernetesScheme:
return c.kubernetesCatalog.Query(ctx, uri, outputFormat)
case KnativeScheme:
return "", fmt.Errorf("knative service discovery is not yet implemened")
return c.knativeCatalog.Query(ctx, uri, outputFormat)
case OpenshiftScheme:
return "", fmt.Errorf("openshift service discovery is not yet implemented")
default:
Expand Down
125 changes: 125 additions & 0 deletions controllers/discovery/discovery_knative_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package discovery

import (
"context"
"fmt"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"

duckv1 "knative.dev/pkg/apis/duck/v1"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"

servingv1 "knative.dev/serving/pkg/apis/serving/v1"
fakeservingclient "knative.dev/serving/pkg/client/injection/client/fake"
)

func Test_QueryKnativeService(t *testing.T) {
doTestQueryKnativeService(t, "http://knServiceName1.namespace1.svc.cluster.local")
}

func Test_QueryKnativeServiceNotFound(t *testing.T) {
_, client := fakeservingclient.With(context.TODO())
ctg := NewServiceCatalog(nil, newKnDiscoveryClient(client.ServingV1(), nil))
doTestQueryWithError(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("services").
Group("serving.knative.dev").
Version("v1").
Namespace(namespace1).
Name(knServiceName1).Build(), "", fmt.Sprintf("services.serving.knative.dev %q not found", knServiceName1))
}

func doTestQueryKnativeService(t *testing.T, expectedUri string) {
service := &servingv1.Service{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace1,
Name: knServiceName1,
},
Spec: servingv1.ServiceSpec{},
Status: servingv1.ServiceStatus{
RouteStatusFields: servingv1.RouteStatusFields{
Address: &duckv1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: knServiceName1 + "." + namespace1 + ".svc.cluster.local",
},
},
},
},
}
_, client := fakeservingclient.With(context.TODO(), service)
ctg := NewServiceCatalog(nil, newKnDiscoveryClient(client.ServingV1(), nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("services").
Group("serving.knative.dev").
Version("v1").
Namespace(namespace1).
Name(knServiceName1).Build(), "", expectedUri)
}

func Test_QueryKnativeBroker(t *testing.T) {
doTestQueryKnativeBroker(t, "http://broker-ingress.knative-eventing.svc.cluster.local/namespace1/knBrokerName1")
}

func Test_QueryKnativeBrokerNotFound(t *testing.T) {
_, client := fakeeventingclient.With(context.TODO())
ctg := NewServiceCatalog(nil, newKnDiscoveryClient(nil, client.EventingV1()))
doTestQueryWithError(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("brokers").
Group("eventing.knative.dev").
Version("v1").
Namespace(namespace1).
Name(knBrokerName1).Build(), "", fmt.Sprintf("brokers.eventing.knative.dev %q not found", knBrokerName1))
}

func doTestQueryKnativeBroker(t *testing.T, expectedUri string) {
broker := &eventingv1.Broker{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace1,
Name: knBrokerName1,
},
Spec: eventingv1.BrokerSpec{},
Status: eventingv1.BrokerStatus{
Address: duckv1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: "broker-ingress.knative-eventing.svc.cluster.local",
Path: "/" + namespace1 + "/" + knBrokerName1,
},
},
},
}
_, client := fakeeventingclient.With(context.TODO(), broker)
ctg := NewServiceCatalog(nil, newKnDiscoveryClient(nil, client.EventingV1()))
doTestQuery(t, ctg, *NewResourceUriBuilder(KnativeScheme).
Kind("brokers").
Group("eventing.knative.dev").
Version("v1").
Namespace(namespace1).
Name(knBrokerName1).Build(), "", expectedUri)
}
14 changes: 7 additions & 7 deletions controllers/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func doTestQueryKubernetesService(t *testing.T, outputFormat string, expectedUri
service.Spec.Type = corev1.ServiceTypeNodePort
service.Spec.ClusterIP = "10.1.5.18"
cli := fake.NewClientBuilder().WithRuntimeObjects(service).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("services").
Version("v1").
Expand All @@ -86,7 +86,7 @@ func doTestQueryKubernetesPod(t *testing.T, outputFormat string, expectedUri str
*mockContainerWithPorts("container1Name", mockContainerPort(httpProtocol, tcp, defaultHttpPort)))
pod.Status.PodIP = "10.1.12.13"
cli := fake.NewClientBuilder().WithRuntimeObjects(pod).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("pods").
Version("v1").
Expand Down Expand Up @@ -116,7 +116,7 @@ func doTesQueryKubernetesDeploymentWithService(t *testing.T, outputFormat string
service.Spec.Type = corev1.ServiceTypeNodePort

cli := fake.NewClientBuilder().WithRuntimeObjects(deployment, service).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))

doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand All @@ -142,7 +142,7 @@ func doTestQueryKubernetesDeploymentWithoutService(t *testing.T, outputFormat st
}

deployment := mockDeployment(namespace1, deployment1Name, nil, &selector)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(deployment).Build())
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(deployment).Build(), newKnDiscoveryClient(nil, nil))

uri := *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand Down Expand Up @@ -176,7 +176,7 @@ func doTestQueryKubernetesStatefulSetWithService(t *testing.T, outputFormat stri
service.Spec.Type = corev1.ServiceTypeNodePort

cli := fake.NewClientBuilder().WithRuntimeObjects(statefulSet, service).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))

doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand All @@ -202,7 +202,7 @@ func doTestQueryKubernetesStatefulSetWithoutService(t *testing.T, outputFormat s
}

statefulSet := mockStatefulSet(namespace1, statefulSet1Name, nil, &selector)
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(statefulSet).Build())
ctg := NewServiceCatalog(fake.NewClientBuilder().WithRuntimeObjects(statefulSet).Build(), newKnDiscoveryClient(nil, nil))

uri := *NewResourceUriBuilder(KubernetesScheme).
Group("apps").
Expand Down Expand Up @@ -237,7 +237,7 @@ func doTestQueryKubernetesIngress(t *testing.T, hostName string, ip string, tls
ingress.Spec.TLS = []v1.IngressTLS{{}}
}
cli := fake.NewClientBuilder().WithRuntimeObjects(ingress).Build()
ctg := NewServiceCatalog(cli)
ctg := NewServiceCatalog(cli, newKnDiscoveryClient(nil, nil))
doTestQuery(t, ctg, *NewResourceUriBuilder(KubernetesScheme).
Kind("ingresses").
Group("networking.k8s.io").
Expand Down
Loading

0 comments on commit 5733069

Please sign in to comment.