Skip to content

Commit

Permalink
move kafka.go to its own package
Browse files Browse the repository at this point in the history
  • Loading branch information
nasark committed Nov 2, 2023
1 parent 01a5414 commit f5cadb8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package miqtools
package miqkafka

import (
"bytes"
Expand Down
46 changes: 10 additions & 36 deletions manageiq-operator/internal/controller/manageiq_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (

miqv1alpha1 "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1"
cr_migration "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/cr_migration"
miqtool "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components"
miqkafka "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components/kafka"
miqtool "github.com/ManageIQ/manageiq-pods/manageiq-operator/api/v1alpha1/helpers/miq-components"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -525,7 +525,7 @@ func (r *ManageIQReconciler) generatePostgresqlResources(cr *miqv1alpha1.ManageI
}

func (r *ManageIQReconciler) generateKafkaResources(cr *miqv1alpha1.ManageIQ) error {
kafkaSubscription, mutateFunc := miqtool.KafkaInstall(cr, r.Scheme)
kafkaSubscription, mutateFunc := miqkafka.KafkaInstall(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaSubscription, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
Expand Down Expand Up @@ -562,40 +562,14 @@ func (r *ManageIQReconciler) generateKafkaResources(cr *miqv1alpha1.ManageIQ) er
logger.Info("Kafka User has been reconciled", "result", result)
}

kafkaService, mutateFunc := miqtool.KafkaService(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaService, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Service has been reconciled", "component", "kafka", "result", result)
}

zookeeperService, mutateFunc := miqtool.ZookeeperService(cr, r.Scheme)
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperService, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Service has been reconciled", "component", "zookeeper", "result", result)
}

kafkaDeployment, mutateFunc, err := miqtool.KafkaDeployment(cr, r.Client, r.Scheme)
if err != nil {
return err
}

if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaDeployment, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Deployment has been reconciled", "component", "kafka", "result", result)
}

zookeeperDeployment, mutateFunc, err := miqtool.ZookeeperDeployment(cr, r.Client, r.Scheme)
if err != nil {
return err
}

if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, zookeeperDeployment, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info("Deployment has been reconciled", "component", "zookeeper", "result", result)
topics := []string{"messaging-health-check", "manageiq.ems", "manageiq.ems-events", "manageiq.ems-inventory", "manageiq.metrics"}
for i := 0; i < len(topics); i++ {
kafkaTopicCR, mutateFunc := miqkafka.KafkaTopic(cr, r.Scheme, topics[i])
if result, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, kafkaTopicCR, mutateFunc); err != nil {
return err
} else if result != controllerutil.OperationResultNone {
logger.Info(fmt.Sprintf("Kafka topic %s has been reconciled", topics[i]))
}
}

return nil
Expand Down

0 comments on commit f5cadb8

Please sign in to comment.