Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: fix wo…
Browse files Browse the repository at this point in the history
…rkflow deletion hanging issue
  • Loading branch information
jianrongzhang89 committed Sep 19, 2024
1 parent b5ad20a commit 3c59b09
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 7 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ CONTROLLER_TOOLS_VERSION ?= v0.9.2
KIND_VERSION ?= v0.20.0
KNATIVE_VERSION ?= v1.13.2
TIMEOUT_SECS ?= 180s
PROMETHEUS_VERSION ?= v0.70.0

KNATIVE_SERVING_PREFIX ?= "https://github.com/knative/serving/releases/download/knative-$(KNATIVE_VERSION)"
KNATIVE_EVENTING_PREFIX ?= "https://github.com/knative/eventing/releases/download/knative-$(KNATIVE_VERSION)"
Expand Down Expand Up @@ -365,6 +366,13 @@ deploy-knative: create-cluster
kubectl apply -f ./test/testdata/knative_serving_eventing.yaml
kubectl wait --for=condition=Ready=True KnativeServing/knative-serving -n knative-serving --timeout=$(TIMEOUT_SECS)
kubectl wait --for=condition=Ready=True KnativeEventing/knative-eventing -n knative-eventing --timeout=$(TIMEOUT_SECS)

.PHONY: deploy-prometheus
deploy-prometheus: create-cluster
kubectl create -f https://github.com/prometheus-operator/prometheus-operator/releases/download/$(PROMETHEUS_VERSION)/bundle.yaml
kubectl wait --for=condition=Available=True deploy/prometheus-operator -n default --timeout=$(TIMEOUT_SECS)
kubectl apply -f ./test/testdata/prometheus.yaml
kubectl wait --for=condition=Available=True prometheus/prometheus -n default --timeout=$(TIMEOUT_SECS)

.PHONY: delete-cluster
delete-cluster: install-kind
Expand Down
2 changes: 1 addition & 1 deletion api/v1alpha08/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ metadata:
operators.operatorframework.io/project_layout: go.kubebuilder.io/v3
repository: https://github.com/apache/incubator-kie-kogito-serverless-operator
support: Red Hat
name: sonataflow-operator.v999.0.0-snapshot
name: sonataflow-operator.v0.0.104
namespace: placeholder
spec:
apiservicedefinitions: {}
Expand Down Expand Up @@ -777,7 +777,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: docker.io/apache/incubator-kie-sonataflow-operator:latest
image: quay.io/jianrzha/kogito-serverless-operator:0.0.104
livenessProbe:
httpGet:
path: /healthz
Expand Down Expand Up @@ -899,4 +899,4 @@ spec:
minKubeVersion: 1.23.0
provider:
name: Red Hat
version: 999.0.0-snapshot
version: 0.0.104
4 changes: 2 additions & 2 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: docker.io/apache/incubator-kie-sonataflow-operator
newTag: latest
newName: quay.io/jianrzha/kogito-serverless-operator
newTag: 0.0.104
# Patching the manager deployment file to add an env var with the operator namespace in
patchesJson6902:
- patch: |-
Expand Down
2 changes: 1 addition & 1 deletion container-builder/api/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions controllers/knative/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package knative

import (
"k8s.io/client-go/rest"
)

type MonitoringAvailability struct {
Prometheus bool
Grafana bool
}

const (
prometheusGroup = "prometheuses.monitoring.coreos.com"
)

func GetMonitoringAvailability(cfg *rest.Config) (*MonitoringAvailability, error) {
if cli, err := getDiscoveryClient(cfg); err != nil {
return nil, err
} else {
apiList, err := cli.ServerGroups()
if err != nil {
return nil, err
}
result := new(MonitoringAvailability)
for _, group := range apiList.Groups {
if group.Name == prometheusGroup {
result.Prometheus = true
}
if group.Name == knativeEventingGroup {
result.Grafana = true
}
}
return result, nil
}
}
76 changes: 76 additions & 0 deletions controllers/profiles/common/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 Apache Software Foundation (ASF)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"context"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ MonitoringEventingHandler = &monitoringObjectManager{}

type monitoringObjectManager struct {
serviceMonitor ObjectEnsurer
*StateSupport
}

func NewMonitoringEventingHandler(support *StateSupport) MonitoringEventingHandler {
return &monitoringObjectManager{
serviceMonitor: NewObjectEnsurer(support.C, ServiceMonitorCreator),
StateSupport: support,
}
}

type MonitoringEventingHandler interface {
Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error)
}

func (k monitoringObjectManager) Ensure(ctx context.Context, workflow *operatorapi.SonataFlow) ([]client.Object, error) {
var objs []client.Object

MonitoringAvail, err := knative.GetMonitoringAvailability(k.Cfg)
if err != nil {
klog.V(log.I).InfoS("Error checking Monitoring Eventing: %v", err)
return nil, err
}
if !MonitoringAvail.Prometheus {
klog.V(log.I).InfoS("Prometheus is not installed")
} else {
// create serviceMonitor
serviceMonitor, _, err := k.serviceMonitor.Ensure(ctx, workflow)
if err != nil {
return objs, err
} else if serviceMonitor != nil {
objs = append(objs, serviceMonitor)
}
/*
triggers := k.trigger.Ensure(ctx, workflow)
for _, trigger := range triggers {
if trigger.Error != nil {
return objs, trigger.Error
}
objs = append(objs, trigger.Object)
}
}
*/
return objs, nil
}
return nil, nil
}
34 changes: 34 additions & 0 deletions controllers/profiles/common/object_creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"

monv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"

"github.com/imdario/mergo"
Expand Down Expand Up @@ -64,6 +65,8 @@ const (
deploymentKind = "Deployment"
k8sServiceAPIVersion = "v1"
k8sServiceKind = "Service"
k8sServicePortName = "web"
k8sServicePortPath = "/q/metrics"
)

// ObjectCreator is the func that creates the initial reference object, if the object doesn't exist in the cluster, this one is created.
Expand Down Expand Up @@ -262,6 +265,7 @@ func ServiceCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
Spec: corev1.ServiceSpec{
Selector: lbl,
Ports: []corev1.ServicePort{{
Name: k8sServicePortName,
Protocol: corev1.ProtocolTCP,
Port: defaultHTTPServicePort,
TargetPort: variables.DefaultHTTPWorkflowPortIntStr,
Expand Down Expand Up @@ -446,3 +450,33 @@ func ManagedPropsConfigMapCreator(workflow *operatorapi.SonataFlow, platform *op
}
return workflowproj.CreateNewManagedPropsConfigMap(workflow, props), nil
}

// ServiceMonitorCreator is an ObjectsCreator for Service Monitor for the workflow service.
// It will create v1.SinkBinding based on events defined in workflow.
func ServiceMonitorCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
lbl := workflowproj.GetMergedLabels(workflow)

// subject must be deployment to inject K_SINK, service won't work
serviceMonitor := &monv1.ServiceMonitor{
ObjectMeta: metav1.ObjectMeta{
Name: workflow.Name,
Namespace: workflow.Namespace,
Labels: lbl,
},
Spec: monv1.ServiceMonitorSpec{
Selector: metav1.LabelSelector{
MatchLabels: map[string]string{
workflowproj.LabelWorkflow: workflow.Name,
workflowproj.LabelWorkflowNamespace: workflow.Namespace,
},
},
Endpoints: []monv1.Endpoint{
monv1.Endpoint{
Port: k8sServicePortName,
Path: k8sServicePortPath,
},
},
},
}
return serviceMonitor, nil
}
8 changes: 8 additions & 0 deletions controllers/profiles/preview/deployment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ func (d *DeploymentReconciler) ensureObjects(ctx context.Context, workflow *oper
}
objs = append(objs, eventingObjs...)

monitoringObjs, err := common.NewMonitoringEventingHandler(d.StateSupport).Ensure(ctx, workflow)
if err != nil {
workflow.Status.Manager().MarkFalse(api.RunningConditionType, api.WaitingForDeploymentReason, "Unable to deploy monitoring objects due to ", err)
_, _ = d.PerformStatusUpdate(ctx, workflow)
return reconcile.Result{}, nil, err
}

objs = append(objs, monitoringObjs...)
return reconcile.Result{}, objs, nil
}

Expand Down
55 changes: 55 additions & 0 deletions test/testdata/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
apiVersion: monitoring.coreos.com/v1
kind: Prometheus
metadata:
name: prometheus
spec:
serviceAccountName: prometheus
serviceMonitorNamespaceSelector: {}
serviceMonitorSelector: {}
podMonitorSelector: {}
resources:
requests:
memory: 400Mi
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: prometheus
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: prometheus
rules:
- apiGroups: [""]
resources:
- nodes
- nodes/metrics
- services
- endpoints
- pods
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources:
- configmaps
verbs: ["get"]
- apiGroups:
- networking.k8s.io
resources:
- ingresses
verbs: ["get", "list", "watch"]
- nonResourceURLs: ["/metrics"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: prometheus
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: prometheus
subjects:
- kind: ServiceAccount
name: prometheus
namespace: default

0 comments on commit 3c59b09

Please sign in to comment.