Skip to content

Commit

Permalink
[issue-368] knative integration with DataIndex and JobService: fix wo…
Browse files Browse the repository at this point in the history
…rkflow deletion hanging issu
  • Loading branch information
jianrongzhang89 committed Sep 19, 2024
1 parent 3c59b09 commit 2b857b1
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 24 deletions.
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ KIND_VERSION ?= v0.20.0
KNATIVE_VERSION ?= v1.13.2
TIMEOUT_SECS ?= 180s
PROMETHEUS_VERSION ?= v0.70.0
GRAFANA_VERSION ?= v5.13.0

KNATIVE_SERVING_PREFIX ?= "https://github.com/knative/serving/releases/download/knative-$(KNATIVE_VERSION)"
KNATIVE_EVENTING_PREFIX ?= "https://github.com/knative/eventing/releases/download/knative-$(KNATIVE_VERSION)"
Expand Down Expand Up @@ -367,12 +368,15 @@ deploy-knative: create-cluster
kubectl wait --for=condition=Ready=True KnativeServing/knative-serving -n knative-serving --timeout=$(TIMEOUT_SECS)
kubectl wait --for=condition=Ready=True KnativeEventing/knative-eventing -n knative-eventing --timeout=$(TIMEOUT_SECS)

.PHONY: deploy-prometheus
deploy-prometheus: create-cluster
.PHONY: deploy-monitoring
deploy-monitoring: create-cluster
kubectl create -f https://github.com/prometheus-operator/prometheus-operator/releases/download/$(PROMETHEUS_VERSION)/bundle.yaml
kubectl wait --for=condition=Available=True deploy/prometheus-operator -n default --timeout=$(TIMEOUT_SECS)
kubectl apply -f ./test/testdata/prometheus.yaml
kubectl create -f https://github.com/grafana/grafana-operator/releases/$(GRAFANA_VERSION)/download/kustomize-cluster_scoped.yaml
kubectl wait --for=condition=Available=True deploy/grafana-operator-controller-manager -n grafana --timeout=$(TIMEOUT_SECS)
kubectl apply -f ./test/testdata/monitoring.yaml
kubectl wait --for=condition=Available=True prometheus/prometheus -n default --timeout=$(TIMEOUT_SECS)
kubectl wait --for=condition=Available=True deploy/grafana-deployment -n default --timeout=$(TIMEOUT_SECS)

.PHONY: delete-cluster
delete-cluster: install-kind
Expand Down
5 changes: 3 additions & 2 deletions controllers/knative/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type MonitoringAvailability struct {
}

const (
prometheusGroup = "prometheuses.monitoring.coreos.com"
prometheusGroup = "monitoring.coreos.com"
grafanaGroup = "grafana.integreatly.org"
)

func GetMonitoringAvailability(cfg *rest.Config) (*MonitoringAvailability, error) {
Expand All @@ -45,7 +46,7 @@ func GetMonitoringAvailability(cfg *rest.Config) (*MonitoringAvailability, error
if group.Name == prometheusGroup {
result.Prometheus = true
}
if group.Name == knativeEventingGroup {
if group.Name == grafanaGroup {
result.Grafana = true
}
}
Expand Down
26 changes: 14 additions & 12 deletions controllers/profiles/common/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type monitoringObjectManager struct {
func NewMonitoringEventingHandler(support *StateSupport) MonitoringEventingHandler {
return &monitoringObjectManager{
serviceMonitor: NewObjectEnsurer(support.C, ServiceMonitorCreator),
dataSource: NewObjectEnsurer(support.C, DataSourceCreator),
StateSupport: support,
}
}
Expand All @@ -60,17 +61,18 @@ func (k monitoringObjectManager) Ensure(ctx context.Context, workflow *operatora
} else if serviceMonitor != nil {
objs = append(objs, serviceMonitor)
}
/*
triggers := k.trigger.Ensure(ctx, workflow)
for _, trigger := range triggers {
if trigger.Error != nil {
return objs, trigger.Error
}
objs = append(objs, trigger.Object)
}
}
*/
return objs, nil
}
return nil, nil

if !MonitoringAvail.Grafana {
klog.V(log.I).InfoS("Grafana is not installed")
} else {
// create grafana data source
dataSource, _, err := k.dataSource.Ensure(ctx, workflow)
if err != nil {
return objs, err
} else if dataSource != nil {
objs = append(objs, dataSource)
}
}
return objs, nil
}
41 changes: 35 additions & 6 deletions controllers/profiles/common/object_creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"

monv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
grafana "github.com/grafana/grafana-operator/v5/api/v1beta1"
prometheus "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
cncfmodel "github.com/serverlessworkflow/sdk-go/v2/model"

"github.com/imdario/mergo"
Expand Down Expand Up @@ -452,26 +453,54 @@ func ManagedPropsConfigMapCreator(workflow *operatorapi.SonataFlow, platform *op
}

// ServiceMonitorCreator is an ObjectsCreator for Service Monitor for the workflow service.
// It will create v1.SinkBinding based on events defined in workflow.
func ServiceMonitorCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
lbl := workflowproj.GetMergedLabels(workflow)

// subject must be deployment to inject K_SINK, service won't work
serviceMonitor := &monv1.ServiceMonitor{
serviceMonitor := &prometheus.ServiceMonitor{
ObjectMeta: metav1.ObjectMeta{
Name: workflow.Name,
Namespace: workflow.Namespace,
Labels: lbl,
},
Spec: prometheus.ServiceMonitorSpec{
Selector: metav1.LabelSelector{
MatchLabels: map[string]string{
workflowproj.LabelWorkflow: workflow.Name,
workflowproj.LabelWorkflowNamespace: workflow.Namespace,
},
},
Endpoints: []prometheus.Endpoint{
prometheus.Endpoint{
Port: k8sServicePortName,
Path: k8sServicePortPath,
},
},
},
}
return serviceMonitor, nil
}

// DataSourceCreator is an ObjectsCreator for the grafana data source for the workflow service.
func DataSourceCreator(workflow *operatorapi.SonataFlow) (client.Object, error) {
lbl := workflowproj.GetMergedLabels(workflow)

// subject must be deployment to inject K_SINK, service won't work
serviceMonitor := &grafana.ServiceMonitor{
ObjectMeta: metav1.ObjectMeta{
Name: workflow.Name,
Namespace: workflow.Namespace,
Labels: lbl,
},
Spec: monv1.ServiceMonitorSpec{
Spec: prometheus.ServiceMonitorSpec{
Selector: metav1.LabelSelector{
MatchLabels: map[string]string{
workflowproj.LabelWorkflow: workflow.Name,
workflowproj.LabelWorkflowNamespace: workflow.Namespace,
},
},
Endpoints: []monv1.Endpoint{
monv1.Endpoint{
Endpoints: []prometheus.Endpoint{
prometheus.Endpoint{
Port: k8sServicePortName,
Path: k8sServicePortPath,
},
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

monv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -63,6 +64,7 @@ func init() {
utilruntime.Must(sourcesv1.AddToScheme(scheme))
utilruntime.Must(eventingv1.AddToScheme(scheme))
utilruntime.Must(servingv1.AddToScheme(scheme))
utilruntime.Must(monv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

Expand Down
2 changes: 1 addition & 1 deletion operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27409,7 +27409,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: docker.io/apache/incubator-kie-sonataflow-operator:latest
image: quay.io/jianrzha/kogito-serverless-operator:0.0.104
livenessProbe:
httpGet:
path: /healthz
Expand Down
12 changes: 12 additions & 0 deletions test/testdata/prometheus.yaml → test/testdata/monitoring.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ spec:
requests:
memory: 400Mi
---
apiVersion: grafana.integreatly.org/v1beta1
kind: Grafana
metadata:
name: grafana
labels:
dashboards: "grafana"
spec:
config:
security:
admin_user: root
admin_password: secret
---
apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down

0 comments on commit 2b857b1

Please sign in to comment.