From 873cec1a478b0be2528898173c2993d6c0b7a7d8 Mon Sep 17 00:00:00 2001 From: Nasar Khan Date: Mon, 23 Oct 2023 16:35:22 -0400 Subject: [PATCH] replace bitnami with strimzi kafka --- .../v1alpha1/helpers/miq-components/kafka.go | 634 ++++++++---------- .../miq-components/network_policies.go | 2 +- .../helpers/miq-components/orchestrator.go | 26 +- manageiq-operator/config/manager/manager.yaml | 2 +- manageiq-operator/config/rbac/role.yaml | 28 + .../config/samples/_v1alpha1_manageiq.yaml | 2 + .../controller/manageiq_controller.go | 68 +- 7 files changed, 353 insertions(+), 409 deletions(-) diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go index ba023c96e..88949a3f3 100644 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka.go @@ -1,405 +1,351 @@ package miqtools import ( - "context" - miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - resource "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - intstr "k8s.io/apimachinery/pkg/util/intstr" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -func ManageKafkaSecret(cr *miqv1alpha1.ManageIQ, client client.Client, scheme *runtime.Scheme) (*corev1.Secret, controllerutil.MutateFn) { - secretKey := types.NamespacedName{Namespace: cr.ObjectMeta.Namespace, Name: cr.Spec.KafkaSecret} - secret := &corev1.Secret{} - secretErr := client.Get(context.TODO(), secretKey, secret) - if secretErr != nil { - secret = defaultKafkaSecret(cr) - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, secret, scheme); err != nil { - return err - } - - addAppLabel(cr.Spec.AppName, &secret.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &secret.ObjectMeta) - - return nil - } - - return secret, f -} - -func defaultKafkaSecret(cr *miqv1alpha1.ManageIQ) *corev1.Secret { - secretData := map[string]string{ - "username": "root", - "password": generatePassword(), - "hostname": "kafka", - } - - secret := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: kafkaSecretName(cr), - Namespace: cr.ObjectMeta.Namespace, +func KafkaClusterSpec() (map[string]interface{}) { + return map[string]interface{}{ + "kafka": map[string]interface{}{ + "replicas": 1, + "listeners": []map[string]interface{}{ + map[string]interface{}{ + "name": "kafka", + "port": 9093, + "type": "internal", + "tls": true, + "authentication": map[string]interface{}{ + "type": "scram-sha-512", + }, + }, + }, + "config": map[string]interface{}{ + "offsets.topic.replication.factor": 1, + "transaction.state.log.replication.factor": 1, + "transaction.state.log.min.isr": 1, + "default.replication.factor": 1, + "min.insync.replicas": 1, + }, + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "kafkaContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + }, + "storage": map[string]interface{}{ + "type": "persistent-claim", + "deleteClaim": true, + }, + "authorization": map[string]interface{}{ + "type": "simple", + }, + "resources": map[string]interface{}{ + "requests": map[string]interface{}{}, + "limits": map[string]interface{}{}, + }, }, - StringData: secretData, - } - - addAppLabel(cr.Spec.AppName, &secret.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &secret.ObjectMeta) - - return secret -} - -func kafkaSecretName(cr *miqv1alpha1.ManageIQ) string { - secretName := "kafka-secrets" - if cr.Spec.KafkaSecret != "" { - secretName = cr.Spec.KafkaSecret - } - - return secretName -} - -func KafkaPVC(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*corev1.PersistentVolumeClaim, controllerutil.MutateFn) { - storageReq, _ := resource.ParseQuantity(cr.Spec.KafkaVolumeCapacity) - - resources := corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - "storage": storageReq, + "zookeeper": map[string]interface{}{ + "replicas": 1, + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "zookeeperContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + }, + "storage": map[string]interface{}{ + "type": "persistent-claim", + "deleteClaim": true, + }, + "resources": map[string]interface{}{ + "requests": map[string]interface{}{}, + "limits": map[string]interface{}{}, + }, }, + "entityOperator": map[string]interface{}{ + "template": map[string]interface{}{ + "pod": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "runAsNonRoot": true, + }, + }, + "topicOperatorContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + "userOperatorContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + "tlsSidecarContainer": map[string]interface{}{ + "securityContext": map[string]interface{}{ + "allowPrivilegeEscalation": false, + "capabilities": map[string]interface{}{ + "drop": []string{"ALL"}, + }, + "privileged": false, + "readOnlyRootFilesystem": false, + "runAsNonRoot": true, + }, + }, + }, + "tlsSidecar": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": "500m", + "memory": "128Mi", + }, + "limits": map[string]interface{}{ + "cpu": "500m", + "memory": "128Mi", + }, + }, + }, + "userOperator": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + "limits": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + }, + }, + "topicOperator": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + "limits": map[string]interface{}{ + "cpu": 1, + "memory": "1Gi", + }, + }, + }, + }, + // "clusterCa": map[string]interface{}{ + // "generateCertificateAuthority": false, + // }, } +} - accessModes := []corev1.PersistentVolumeAccessMode{ - "ReadWriteOnce", - } - - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kafka-data", - Namespace: cr.ObjectMeta.Namespace, - }, +func KafkaCluster(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaClusterCR := &unstructured.Unstructured{} + kafkaClusterCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "Kafka", + Version: "v1beta2", + }) + kafkaClusterCR.SetName("manageiq") + kafkaClusterCR.SetNamespace(cr.Namespace) + + kafkaCRSpec := KafkaClusterSpec() + + if cr.Spec.StorageClassName != "" { + kafkaStorage := kafkaCRSpec["kafka"].(map[string]interface{})["storage"].(map[string]interface{}) + kafkaStorage["class"] = cr.Spec.StorageClassName + zookeeperStorage := kafkaCRSpec["zookeeper"].(map[string]interface{})["storage"].(map[string]interface{}) + zookeeperStorage["class"] = cr.Spec.StorageClassName } - f := func() error { - if err := controllerutil.SetControllerReference(cr, pvc, scheme); err != nil { + kafkaResourceRequests := kafkaCRSpec["kafka"].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{}) + kafkaResourceRequests["memory"] = "1Gi" + kafkaResourceRequests["cpu"] = "200m" + kafkaResourceLimits := kafkaCRSpec["kafka"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{}) + kafkaResourceLimits["memory"] = "2Gi" + kafkaResourceLimits["cpu"] = "400m" + + zookeeperResourceRequests := kafkaCRSpec["zookeeper"].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{}) + zookeeperResourceRequests["memory"] = "256Mi" + zookeeperResourceRequests["cpu"] = "150m" + zookeeperResourceLimits := kafkaCRSpec["zookeeper"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{}) + zookeeperResourceLimits["memory"] = "512Mi" + zookeeperResourceLimits["cpu"] = "250m" + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaClusterCR, scheme); err != nil { return err } - addAppLabel(cr.Spec.AppName, &pvc.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &pvc.ObjectMeta) - pvc.Spec.AccessModes = accessModes - pvc.Spec.Resources = resources + kafkaStorage := kafkaCRSpec["kafka"].(map[string]interface{})["storage"].(map[string]interface{}) + kafkaStorage["size"] = cr.Spec.KafkaVolumeCapacity + + zookeeperStorage := kafkaCRSpec["zookeeper"].(map[string]interface{})["storage"].(map[string]interface{}) + zookeeperStorage["size"] = cr.Spec.ZookeeperVolumeCapacity + + kafkaClusterCR.UnstructuredContent()["spec"] = kafkaCRSpec - if cr.Spec.StorageClassName != "" { - pvc.Spec.StorageClassName = &cr.Spec.StorageClassName - } return nil } - return pvc, f + return kafkaClusterCR, mutateFunc } -func ZookeeperPVC(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*corev1.PersistentVolumeClaim, controllerutil.MutateFn) { - storageReq, _ := resource.ParseQuantity(cr.Spec.DatabaseVolumeCapacity) - - resources := corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - "storage": storageReq, +func KafkaUserSpec() (map[string]interface{}) { + return map[string]interface{}{ + "authentication": map[string]interface{}{ + "type": "scram-sha-512", + }, + "authorization": map[string]interface{}{ + "type": "simple", + "acls": []map[string]interface{}{ + map[string]interface{}{ + "resource": map[string]interface{}{ + "type": "topic", + "name": "*", + "patternType": "literal", + }, + "operations": []string{"All"}, + "host": "*", + }, + map[string]interface{}{ + "resource": map[string]interface{}{ + "type": "group", + "name": "*", + "patternType": "literal", + }, + "operations": []string{"All"}, + "host": "*", + }, + }, }, } +} - accessModes := []corev1.PersistentVolumeAccessMode{ - "ReadWriteOnce", - } +func KafkaUser(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaUserCR := &unstructured.Unstructured{} - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "zookeeper-data", - Namespace: cr.ObjectMeta.Namespace, - }, - } + kafkaUserCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "KafkaUser", + Version: "v1beta2", + }) + kafkaUserCR.SetName("manageiq-user") + kafkaUserCR.SetNamespace(cr.Namespace) + kafkaUserCR.SetLabels(map[string]string{"strimzi.io/cluster": "manageiq"}) - f := func() error { - if err := controllerutil.SetControllerReference(cr, pvc, scheme); err != nil { + kafkaUserSpec := KafkaUserSpec() + + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaUserCR, scheme); err != nil { return err } - addAppLabel(cr.Spec.AppName, &pvc.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &pvc.ObjectMeta) - pvc.Spec.AccessModes = accessModes - pvc.Spec.Resources = resources + kafkaUserCR.UnstructuredContent()["spec"] = kafkaUserSpec - if cr.Spec.StorageClassName != "" { - pvc.Spec.StorageClassName = &cr.Spec.StorageClassName - } return nil } - return pvc, f + return kafkaUserCR, mutateFunc } -func KafkaService(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*corev1.Service, controllerutil.MutateFn) { - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kafka", - Namespace: cr.ObjectMeta.Namespace, +func KafkaTopicSpec() (map[string]interface{}) { + return map[string]interface{}{ + "partitions": 1, + "config": map[string]interface{}{ + "retention.ms": 7200000, + "segment.bytes": 1073741824, }, } +} - f := func() error { - if err := controllerutil.SetControllerReference(cr, service, scheme); err != nil { - return err - } - - addAppLabel(cr.Spec.AppName, &service.ObjectMeta) - if len(service.Spec.Ports) == 0 { - service.Spec.Ports = append(service.Spec.Ports, corev1.ServicePort{}) - } - service.Spec.Ports[0].Name = "kafka" - service.Spec.Ports[0].Port = 9092 - service.Spec.Selector = map[string]string{"name": "kafka"} - return nil - } +func KafkaTopic(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme, topicName string) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaTopicCR := &unstructured.Unstructured{} - return service, f -} + kafkaTopicCR.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "kafka.strimzi.io", + Kind: "KafkaTopic", + Version: "v1beta2", + }) + kafkaTopicCR.SetName(topicName) + kafkaTopicCR.SetNamespace(cr.Namespace) + kafkaTopicCR.SetLabels(map[string]string{"strimzi.io/cluster": "manageiq"}) -func ZookeeperService(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*corev1.Service, controllerutil.MutateFn) { - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "zookeeper", - Namespace: cr.ObjectMeta.Namespace, - }, - } + kafkaTopicSpec := KafkaTopicSpec() - f := func() error { - if err := controllerutil.SetControllerReference(cr, service, scheme); err != nil { + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaTopicCR, scheme); err != nil { return err } - addAppLabel(cr.Spec.AppName, &service.ObjectMeta) - if len(service.Spec.Ports) == 0 { - service.Spec.Ports = append(service.Spec.Ports, corev1.ServicePort{}) - } - service.Spec.Ports[0].Name = "zookeeper" - service.Spec.Ports[0].Port = 2181 - service.Spec.Selector = map[string]string{"name": "zookeeper"} + kafkaTopicCR.UnstructuredContent()["spec"] = kafkaTopicSpec + return nil } - return service, f + return kafkaTopicCR, mutateFunc } -func KafkaDeployment(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*appsv1.Deployment, controllerutil.MutateFn, error) { - deploymentLabels := map[string]string{ - "name": "kafka", - "app": cr.Spec.AppName, - } - - container := corev1.Container{ - Name: "kafka", - Image: cr.Spec.KafkaImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Ports: []corev1.ContainerPort{ - corev1.ContainerPort{ - ContainerPort: 9092, - }, - }, - LivenessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - TCPSocket: &corev1.TCPSocketAction{ - Port: intstr.FromInt(9092), - }, - }, - }, - ReadinessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - TCPSocket: &corev1.TCPSocketAction{ - Port: intstr.FromInt(9092), - }, - }, - }, - Env: []corev1.EnvVar{ - corev1.EnvVar{ - Name: "KAFKA_BROKER_USER", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "username", - }, - }, - }, - corev1.EnvVar{ - Name: "KAFKA_BROKER_PASSWORD", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "password", - }, - }, - }, - corev1.EnvVar{ - Name: "KAFKA_ZOOKEEPER_CONNECT", - Value: "zookeeper:2181", - }, - corev1.EnvVar{ - Name: "ALLOW_PLAINTEXT_LISTENER", - Value: "yes", - }, - corev1.EnvVar{ - Name: "KAFKA_CFG_ADVERTISED_LISTENERS", - Value: "PLAINTEXT://kafka:9092", - }, - }, - VolumeMounts: []corev1.VolumeMount{ - corev1.VolumeMount{Name: "kafka-data", MountPath: "/bitnami/kafka"}, - }, - } - - err := addResourceReqs(cr.Spec.KafkaMemoryLimit, cr.Spec.KafkaMemoryRequest, cr.Spec.KafkaCpuLimit, cr.Spec.KafkaCpuRequest, &container) - if err != nil { - return nil, nil, err +func KafkaInstall(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*unstructured.Unstructured, controllerutil.MutateFn) { + kafkaSubscription := &unstructured.Unstructured{} + kafkaSubscription.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "operators.coreos.com", + Kind: "Subscription", + Version: "v1alpha1", + }) + kafkaSubscription.SetName("strimzi-kafka-operator") + kafkaSubscription.SetNamespace(cr.Namespace) + + kafkaSubscriptionSpec := map[string]interface{}{ + "channel": "strimzi-0.35.x", + "name": "strimzi-kafka-operator", + "source": "community-operators", + "sourceNamespace": "openshift-marketplace", } - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "kafka", - Namespace: cr.ObjectMeta.Namespace, - }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: deploymentLabels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: deploymentLabels, - Name: "kafka", - }, - Spec: corev1.PodSpec{}, - }, - }, - } - - f := func() error { - if err := controllerutil.SetControllerReference(cr, deployment, scheme); err != nil { + mutateFunc := func() error { + if err := controllerutil.SetControllerReference(cr, kafkaSubscription, scheme); err != nil { return err } - addAppLabel(cr.Spec.AppName, &deployment.ObjectMeta) - addBackupAnnotation("kafka-data", &deployment.Spec.Template.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &deployment.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &deployment.Spec.Template.ObjectMeta) - var repNum int32 = 1 - deployment.Spec.Replicas = &repNum - deployment.Spec.Strategy = appsv1.DeploymentStrategy{ - Type: "Recreate", - } - deployment.Spec.Template.Spec.Containers = []corev1.Container{container} - deployment.Spec.Template.Spec.Containers[0].SecurityContext = DefaultSecurityContext() - deployment.Spec.Template.Spec.ServiceAccountName = defaultServiceAccountName(cr.Spec.AppName) - var termSecs int64 = 10 - deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = &termSecs - deployment.Spec.Template.Spec.Volumes = []corev1.Volume{ - corev1.Volume{ - Name: "kafka-data", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "kafka-data", - }, - }, - }, - } - return nil - } - - return deployment, f, nil -} - -func ZookeeperDeployment(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme) (*appsv1.Deployment, controllerutil.MutateFn, error) { - deploymentLabels := map[string]string{ - "name": "zookeeper", - "app": cr.Spec.AppName, - } - - container := corev1.Container{ - Name: "zookeeper", - Image: cr.Spec.ZookeeperImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Ports: []corev1.ContainerPort{ - corev1.ContainerPort{ - ContainerPort: 2181, - }, - }, - Env: []corev1.EnvVar{ - corev1.EnvVar{ - Name: "ALLOW_ANONYMOUS_LOGIN", - Value: "yes", - }, - }, - VolumeMounts: []corev1.VolumeMount{ - corev1.VolumeMount{Name: "zookeeper-data", MountPath: "/bitnami/zookeeper"}, - }, - } - err := addResourceReqs(cr.Spec.ZookeeperMemoryLimit, cr.Spec.ZookeeperMemoryRequest, cr.Spec.ZookeeperCpuLimit, cr.Spec.ZookeeperCpuRequest, &container) - if err != nil { - return nil, nil, err - } - - deployment := &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "zookeeper", - Namespace: cr.ObjectMeta.Namespace, - }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: deploymentLabels, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: deploymentLabels, - Name: "zookeeper", - }, - Spec: corev1.PodSpec{}, - }, - }, - } + kafkaSubscription.UnstructuredContent()["spec"] = kafkaSubscriptionSpec - f := func() error { - if err := controllerutil.SetControllerReference(cr, deployment, scheme); err != nil { - return err - } - addAppLabel(cr.Spec.AppName, &deployment.ObjectMeta) - addBackupAnnotation("zookeeper-data", &deployment.Spec.Template.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &deployment.ObjectMeta) - addBackupLabel(cr.Spec.BackupLabelName, &deployment.Spec.Template.ObjectMeta) - var repNum int32 = 1 - deployment.Spec.Replicas = &repNum - deployment.Spec.Strategy = appsv1.DeploymentStrategy{ - Type: "Recreate", - } - addAnnotations(cr.Spec.AppAnnotations, &deployment.Spec.Template.ObjectMeta) - deployment.Spec.Template.Spec.Containers = []corev1.Container{container} - deployment.Spec.Template.Spec.Containers[0].SecurityContext = DefaultSecurityContext() - deployment.Spec.Template.Spec.ServiceAccountName = defaultServiceAccountName(cr.Spec.AppName) - deployment.Spec.Template.Spec.Volumes = []corev1.Volume{ - corev1.Volume{ - Name: "zookeeper-data", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "zookeeper-data", - }, - }, - }, - } return nil } - return deployment, f, nil + return kafkaSubscription, mutateFunc } diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go index 44aba3488..48e555cd1 100644 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/network_policies.go @@ -227,7 +227,7 @@ func NetworkPolicyAllowKafka(cr *miqv1alpha1.ManageIQ, scheme *runtime.Scheme, c addAppLabel(cr.Spec.AppName, &networkPolicy.ObjectMeta) setIngressPolicyType(networkPolicy) - networkPolicy.Spec.PodSelector.MatchLabels = map[string]string{"name": "kafka"} + networkPolicy.Spec.PodSelector.MatchLabels = map[string]string{"strimzi.io/pod-name": "manageiq-kafka-0"} pod := orchestratorPod(*c) if pod == nil { diff --git a/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go b/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go index 099ccb60a..d9735302d 100644 --- a/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go +++ b/manageiq-operator/api/v1alpha1/helpers/miq-components/orchestrator.go @@ -118,25 +118,20 @@ func addMessagingEnv(cr *miqv1alpha1.ManageIQ, c *corev1.Container) { messagingEnv := []corev1.EnvVar{ corev1.EnvVar{ Name: "MESSAGING_HOSTNAME", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "hostname", - }, - }, + Value: "manageiq-kafka-bootstrap", }, corev1.EnvVar{ Name: "MESSAGING_PASSWORD", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, + LocalObjectReference: corev1.LocalObjectReference{Name: "manageiq-user"}, Key: "password", }, }, }, corev1.EnvVar{ Name: "MESSAGING_PORT", - Value: "9092", + Value: "9093", }, corev1.EnvVar{ Name: "MESSAGING_TYPE", @@ -144,12 +139,15 @@ func addMessagingEnv(cr *miqv1alpha1.ManageIQ, c *corev1.Container) { }, corev1.EnvVar{ Name: "MESSAGING_USERNAME", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: kafkaSecretName(cr)}, - Key: "username", - }, - }, + Value: "manageiq-user", + }, + corev1.EnvVar{ + Name: "MESSAGING_SASL_MECHANISM", + Value: "SCRAM-SHA-512", + }, + corev1.EnvVar{ + Name: "MESSAGING_SSL_CA", + Value: "/etc/pki/ca-trust/source/anchors/root.crt", }, } diff --git a/manageiq-operator/config/manager/manager.yaml b/manageiq-operator/config/manager/manager.yaml index 9c2dd36ea..b12b26085 100644 --- a/manageiq-operator/config/manager/manager.yaml +++ b/manageiq-operator/config/manager/manager.yaml @@ -15,7 +15,7 @@ spec: serviceAccountName: manageiq-operator containers: - name: manageiq-operator - image: docker.io/manageiq/manageiq-operator:latest + image: docker.io/nasark/manageiq-operator:latest imagePullPolicy: Always env: - name: WATCH_NAMESPACE diff --git a/manageiq-operator/config/rbac/role.yaml b/manageiq-operator/config/rbac/role.yaml index 150a86e89..77623fc0d 100644 --- a/manageiq-operator/config/rbac/role.yaml +++ b/manageiq-operator/config/rbac/role.yaml @@ -2,7 +2,9 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: + creationTimestamp: null name: manageiq-operator + namespace: miq rules: - apiGroups: - "" @@ -76,6 +78,32 @@ rules: - patch - update - watch +- apiGroups: + - kafka.strimzi.io + resources: + - kafkas + - kafkatopics + - kafkausers + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - operators.coreos.com + resources: + - subscriptions + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - manageiq.org resources: diff --git a/manageiq-operator/config/samples/_v1alpha1_manageiq.yaml b/manageiq-operator/config/samples/_v1alpha1_manageiq.yaml index 131423eb5..61496461c 100644 --- a/manageiq-operator/config/samples/_v1alpha1_manageiq.yaml +++ b/manageiq-operator/config/samples/_v1alpha1_manageiq.yaml @@ -4,3 +4,5 @@ metadata: name: manageiq-sample spec: applicationDomain: miqproject.apps-crc.testing + # internalCertificatesSecret: internal-certificates-secret + # postgresqlImage: docker.io/bdunne/postgresql:13-ssl_pr6 \ No newline at end of file diff --git a/manageiq-operator/internal/controller/manageiq_controller.go b/manageiq-operator/internal/controller/manageiq_controller.go index 4f92c7e15..bbb26060c 100644 --- a/manageiq-operator/internal/controller/manageiq_controller.go +++ b/manageiq-operator/internal/controller/manageiq_controller.go @@ -52,11 +52,13 @@ type ManageIQReconciler struct { //+kubebuilder:rbac:namespace=changeme,groups=apps,resources=deployments/finalizers,resourceNames=manageiq-operator,verbs=update //+kubebuilder:rbac:namespace=changeme,groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update;delete //+kubebuilder:rbac:namespace=changeme,groups=extensions,resources=deployments;deployments/scale;networkpolicies,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:namespace=changeme,groups=kafka.strimzi.io,resources=kafkas;kafkausers;kafkatopics,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs/finalizers,verbs=update //+kubebuilder:rbac:namespace=changeme,groups=manageiq.org,resources=manageiqs/status,verbs=get;update;patch //+kubebuilder:rbac:namespace=changeme,groups=monitoring.coreos.com,resources=servicemonitors,verbs=get;create //+kubebuilder:rbac:namespace=changeme,groups=networking.k8s.io,resources=ingresses;networkpolicies,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:namespace=changeme,groups=operators.coreos.com,resources=subscriptions,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=rbac.authorization.k8s.io,resources=rolebindings;roles,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:namespace=changeme,groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;list;watch;create;update;patch;delete @@ -522,67 +524,35 @@ func (r *ManageIQReconciler) generatePostgresqlResources(cr *miqv1alpha1.ManageI } func (r *ManageIQReconciler) generateKafkaResources(cr *miqv1alpha1.ManageIQ) error { - secret, mutateFunc := miqtool.ManageKafkaSecret(cr, r.Client, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, secret, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("Secret has been reconciled", "component", "kafka", "result", result) - } - - hostName := getSecretKeyValue(r.Client, cr.Namespace, cr.Spec.KafkaSecret, "hostname") - if hostName != "" { - logger.Info("External Kafka Messaging Service selected, skipping kafka and zookeeper service reconciliation", "hostname", hostName) - return nil - } - - kafkaPVC, mutateFunc := miqtool.KafkaPVC(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaPVC, mutateFunc); err != nil { + kafkaSubscription, mutateFunc := miqtool.KafkaInstall(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaSubscription, mutateFunc); err != nil { return err } else if result != controllerutil.OperationResultNone { - logger.Info("PVC has been reconciled", "component", "kafka", "result", result) + logger.Info("Kafka Subscription has been reconciled", "result", result) } - zookeeperPVC, mutateFunc := miqtool.ZookeeperPVC(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperPVC, mutateFunc); err != nil { + kafkaClusterCR, mutateFunc := miqtool.KafkaCluster(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaClusterCR, mutateFunc); err != nil { return err } else if result != controllerutil.OperationResultNone { - logger.Info("PVC has been reconciled", "component", "zookeeper", "result", result) + logger.Info("Kafka Cluster has been reconciled", "result", result) } - kafkaService, mutateFunc := miqtool.KafkaService(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaService, mutateFunc); err != nil { + kafkaUserCR, mutateFunc := miqtool.KafkaUser(cr, r.Scheme) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaUserCR, mutateFunc); err != nil { return err } else if result != controllerutil.OperationResultNone { - logger.Info("Service has been reconciled", "component", "kafka", "result", result) + logger.Info("Kafka User has been reconciled", "result", result) } - zookeeperService, mutateFunc := miqtool.ZookeeperService(cr, r.Scheme) - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperService, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("Service has been reconciled", "component", "zookeeper", "result", result) - } - - kafkaDeployment, mutateFunc, err := miqtool.KafkaDeployment(cr, r.Scheme) - if err != nil { - return err - } - - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaDeployment, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("Deployment has been reconciled", "component", "kafka", "result", result) - } - - zookeeperDeployment, mutateFunc, err := miqtool.ZookeeperDeployment(cr, r.Scheme) - if err != nil { - return err - } - - if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperDeployment, mutateFunc); err != nil { - return err - } else if result != controllerutil.OperationResultNone { - logger.Info("Deployment has been reconciled", "component", "zookeeper", "result", result) + topics := []string{"messaging-health-check", "manageiq.ems", "manageiq.ems-events", "manageiq.ems-inventory", "manageiq.metrics"} + for i := 0; i < len(topics); i++ { + kafkaTopicCR, mutateFunc := miqtool.KafkaTopic(cr, r.Scheme, topics[i]) + if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaTopicCR, mutateFunc); err != nil { + return err + } else if result != controllerutil.OperationResultNone { + logger.Info(fmt.Sprintf("Kafka topic %s has been reconciled", topics[i])) + } } return nil