Skip to content

Commit

Permalink
fix manager de not recreted after delete manager deploy
Browse files Browse the repository at this point in the history
Signed-off-by: ldpliu <[email protected]>
  • Loading branch information
ldpliu committed Dec 3, 2024
1 parent 8e573b9 commit 7a74b62
Show file tree
Hide file tree
Showing 17 changed files with 68 additions and 16 deletions.
4 changes: 2 additions & 2 deletions operator/pkg/config/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func NeedUpdateConditions(conditions []metav1.Condition,
func UpdateMGHComponent(ctx context.Context,
c client.Client,
desiredComponent v1alpha4.StatusCondition,
forceUpdate bool,
) error {
now := metav1.Time{Time: time.Now()}
desiredComponent.LastTransitionTime = now
Expand All @@ -248,8 +249,7 @@ func UpdateMGHComponent(ctx context.Context,
} else {
originComponent := curmgh.Status.Components[desiredComponent.Name]
originComponent.LastTransitionTime = now

if reflect.DeepEqual(desiredComponent, originComponent) {
if !forceUpdate && reflect.DeepEqual(desiredComponent, originComponent) {
return nil
}
}
Expand Down
4 changes: 3 additions & 1 deletion operator/pkg/config/storage_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,16 @@ func GetPGConnectionFromBuildInPostgres(ctx context.Context, client client.Clien

// SetStorageConnection update the postgres connection
func SetStorageConnection(conn *PostgresConnection) bool {
log.Debugf("Set Storage Connection: %v", conn == nil)
if conn != nil && !reflect.DeepEqual(conn, postgresConn) {
postgresConn = conn
log.Debugf("Update Storage Connection")
return true
}
return false
}

func GetStorageConnection() *PostgresConnection {
log.Debugf("Set Storage Connection: %v", postgresConn != nil)
log.Debugf("Get Storage Connection: %v", postgresConn != nil)
return postgresConn
}
12 changes: 9 additions & 3 deletions operator/pkg/config/transport_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"reflect"
"regexp"
"strings"

Expand Down Expand Up @@ -42,9 +43,14 @@ var (
inventoryConn *transport.RestfulConfig
)

func SetTransporterConn(conn *transport.KafkaConfig) {
log.Debug("Set Transporter Conn")
transporterConn = conn
func SetTransporterConn(conn *transport.KafkaConfig) bool {
log.Debug("set Transporter Conn")
if conn != nil && !reflect.DeepEqual(conn, transporterConn) {
transporterConn = conn
log.Debug("update Transporter Conn")
return true
}
return false
}

func GetTransporterConn() *transport.KafkaConfig {
Expand Down
3 changes: 3 additions & 0 deletions operator/pkg/controllers/acm/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (r *ACMResourceController) IsResourceRemoved() bool {
}

func (r *ACMResourceController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debugf("reconcile acm controller: %v", req)
r.Resources[req.Name] = true

if !r.readyToWatchACMResources() {
Expand Down Expand Up @@ -89,6 +90,7 @@ func (r *ACMResourceController) readyToWatchACMResources() bool {
}

func StartController(opts config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start acm controller")
if acmResourceController != nil {
return acmResourceController, nil
}
Expand Down Expand Up @@ -116,5 +118,6 @@ func StartController(opts config.ControllerOption) (config.ControllerInterface,
return nil, err
}
acmResourceController = acmController
log.Infof("inited acm controller")
return acmResourceController, nil
}
2 changes: 2 additions & 0 deletions operator/pkg/controllers/agent/addon_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func ReadyToEnableAddonManager(mgh *v1alpha4.MulticlusterGlobalHub) bool {
}

func StartAddonManagerController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start addon manager controller")

if addonManagerController != nil {
return addonManagerController, nil
}
Expand Down
5 changes: 4 additions & 1 deletion operator/pkg/controllers/agent/default_agent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func NewDefaultAgentController(c client.Client) *DefaultAgentController {
}

func StartDefaultAgentController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start default agent controller")
if defaultAgentController != nil {
return defaultAgentController, nil
}
Expand Down Expand Up @@ -166,7 +167,7 @@ func StartDefaultAgentController(initOption config.ControllerOption) (config.Con
defaultAgentController = nil
return nil, err
}
log.Info("the default addon reconciler is started")
log.Info("the default agent controller is started")
return defaultAgentController, nil
}

Expand All @@ -176,6 +177,8 @@ func (c *DefaultAgentController) IsResourceRemoved() bool {
}

func (r *DefaultAgentController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debugf("reconcile default agent controller: %v", req)

mgh, err := config.GetMulticlusterGlobalHub(ctx, r.Client)
if err != nil {
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
Expand Down
5 changes: 4 additions & 1 deletion operator/pkg/controllers/agent/hosted_agent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var (
)

func StartHostedAgentController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start hosted agent controller")

if hostedAgentController != nil {
return hostedAgentController, nil
}
Expand All @@ -72,6 +74,7 @@ func StartHostedAgentController(initOption config.ControllerOption) (config.Cont
hostedAgentController = nil
return nil, err
}
log.Info("inited hosted agent controller")
return hostedAgentController, nil
}

Expand Down Expand Up @@ -138,7 +141,7 @@ var mghPred = predicate.Funcs{
}

func (r *HostedAgentController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debug("Reconcile ClusterManagementAddOn: %v", req.NamespacedName)
log.Debugf("reconcile ClusterManagementAddOn: %v", req.NamespacedName)
mgh, err := config.GetMulticlusterGlobalHub(ctx, r.c)
if err != nil {
log.Error(err)
Expand Down
2 changes: 2 additions & 0 deletions operator/pkg/controllers/backup/backup_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var log = logger.DefaultZapLogger()
// So for request.Namespace, we set it as request type, like "Secret","Configmap","MulticlusterGlobalHub" and so on.
// In the reconcile, we identy the request kind and get it by request.Name.
func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debugf("reconcile backup")

// mgh is used to update backup condition
mghList := &globalhubv1alpha4.MulticlusterGlobalHubList{}
err := r.Client.List(ctx, mghList)
Expand Down
2 changes: 2 additions & 0 deletions operator/pkg/controllers/backup/backup_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func GetBackupController() *BackupReconciler {
}

func StartController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Infof("start backup controller")
if backupController != nil {
return backupController, nil
}
Expand All @@ -85,6 +86,7 @@ func StartController(initOption config.ControllerOption) (config.ControllerInter
return nil, err
}
backupController = c
log.Infof("inited backup controller")
return backupController, nil
}

Expand Down
5 changes: 4 additions & 1 deletion operator/pkg/controllers/grafana/grafana_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (r *GrafanaReconciler) IsResourceRemoved() bool {
}

func StartController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start grafana controller")
if grafanaController != nil {
return grafanaController, nil
}
Expand All @@ -141,7 +142,7 @@ func StartController(initOption config.ControllerOption) (config.ControllerInter
grafanaController = nil
return grafanaController, err
}
log.Infof("inited grafana controller")
log.Infof("Inited grafana controller")
return grafanaController, nil
}

Expand Down Expand Up @@ -249,6 +250,7 @@ var secretPred = predicate.Funcs{
}

func (r *GrafanaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debugf("reconcile grafana controller")
mgh, err := config.GetMulticlusterGlobalHub(ctx, r.GetClient())
if err != nil {
return ctrl.Result{}, err
Expand All @@ -262,6 +264,7 @@ func (r *GrafanaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
err = config.UpdateMGHComponent(ctx, r.GetClient(),
config.GetComponentStatusWithReconcileError(ctx, r.GetClient(),
mgh.Namespace, config.COMPONENTS_GRAFANA_NAME, reconcileErr),
false,
)
if err != nil {
log.Errorf("failed to update mgh status, err:%v", err)
Expand Down
3 changes: 3 additions & 0 deletions operator/pkg/controllers/inventory/inventory_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (r *InventoryReconciler) IsResourceRemoved() bool {
}

func StartController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start inventory controller")
if inventoryReconciler != nil {
return inventoryReconciler, nil
}
Expand Down Expand Up @@ -128,6 +129,7 @@ func NewInventoryReconciler(mgr ctrl.Manager, kubeClient kubernetes.Interface) *
func (r *InventoryReconciler) Reconcile(ctx context.Context,
req ctrl.Request,
) (ctrl.Result, error) {
log.Debugf("reconcile inventory controller")
mgh, err := config.GetMulticlusterGlobalHub(ctx, r.GetClient())
if err != nil {
return ctrl.Result{}, nil
Expand All @@ -142,6 +144,7 @@ func (r *InventoryReconciler) Reconcile(ctx context.Context,
err = config.UpdateMGHComponent(ctx, r.GetClient(),
config.GetComponentStatusWithReconcileError(ctx, r.GetClient(),
mgh.Namespace, config.COMPONENTS_INVENTORY_API_NAME, reconcileErr),
false,
)
if err != nil {
log.Errorf("failed to update mgh status, err:%v", err)
Expand Down
3 changes: 3 additions & 0 deletions operator/pkg/controllers/managedhub/managedhub_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (r *ManagedHubController) IsResourceRemoved() bool {
}

func StartController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start managedhub controller")
if managedHubController != nil {
return managedHubController, nil
}
Expand All @@ -72,6 +73,8 @@ func StartController(initOption config.ControllerOption) (config.ControllerInter
}

func (r *ManagedHubController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debugf("reconcile managedhub controller")

mgh, err := config.GetMulticlusterGlobalHub(ctx, r.c)
if err != nil {
return ctrl.Result{}, nil
Expand Down
6 changes: 5 additions & 1 deletion operator/pkg/controllers/manager/manager_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ var (
)

func StartController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start manager controller")

if managerController != nil {
return managerController, nil
}
Expand All @@ -107,7 +109,7 @@ func StartController(initOption config.ControllerOption) (config.ControllerInter
}

func (r *ManagerReconciler) IsResourceRemoved() bool {
log.Infof("ManagerController resource removed: %v", isResourceRemoved)
log.Infof("managerController resource removed: %v", isResourceRemoved)
return isResourceRemoved
}

Expand Down Expand Up @@ -149,6 +151,7 @@ func NewManagerReconciler(mgr ctrl.Manager, kubeClient kubernetes.Interface,
func (r *ManagerReconciler) Reconcile(ctx context.Context,
req ctrl.Request,
) (ctrl.Result, error) {
log.Debug("reconcile manager controller")
mgh, err := config.GetMulticlusterGlobalHub(ctx, r.GetClient())
if err != nil {
return ctrl.Result{}, nil
Expand All @@ -170,6 +173,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context,
err = config.UpdateMGHComponent(ctx, r.GetClient(),
config.GetComponentStatusWithReconcileError(ctx, r.GetClient(),
mgh.Namespace, config.COMPONENTS_MANAGER_NAME, reconcileErr),
false,
)
if err != nil {
log.Errorf("failed to update mgh status, err:%v", err)
Expand Down
12 changes: 10 additions & 2 deletions operator/pkg/controllers/storage/storage_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,18 @@ var WatchedConfigMap = sets.NewString(
constants.PostgresCAConfigMap,
)

var storageReconciler *StorageReconciler
var (
storageReconciler *StorageReconciler
updateConnection bool
)

func (r *StorageReconciler) IsResourceRemoved() bool {
return true
}

func StartController(initOption config.ControllerOption) (config.ControllerInterface, error) {
log.Info("start storage controller")

if storageReconciler != nil {
return storageReconciler, nil
}
Expand Down Expand Up @@ -158,6 +163,7 @@ var secretPred = predicate.Funcs{
}

func (r *StorageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.Debug("reconcile storage controller")
mgh, err := config.GetMulticlusterGlobalHub(ctx, r.GetClient())
if err != nil {
return ctrl.Result{}, err
Expand All @@ -183,6 +189,7 @@ func (r *StorageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
defer func() {
err = config.UpdateMGHComponent(ctx, r.GetClient(),
getDatabaseComponentStatus(ctx, r.GetClient(), mgh.Namespace, reconcileErr),
updateConnection,
)
if err != nil {
log.Errorf("failed to update mgh status, err:%v", err)
Expand All @@ -193,7 +200,7 @@ func (r *StorageReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
reconcileErr = fmt.Errorf("storage not ready, Error: %v", err)
return ctrl.Result{}, reconcileErr
}
_ = config.SetStorageConnection(storageConn)
updateConnection = config.SetStorageConnection(storageConn)

needRequeue, err := r.reconcileDatabase(ctx, mgh)
if err != nil {
Expand Down Expand Up @@ -288,6 +295,7 @@ func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4
log.Infof("wait database ready")
return true, nil
}

defer func() {
if err := conn.Close(ctx); err != nil {
log.Error(err, "failed to close connection to database")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var manifests embed.FS
var (
startedKafkaController = false
isResourceRemoved = false
updateConn bool
)

var log = logger.DefaultZapLogger()
Expand Down Expand Up @@ -91,6 +92,7 @@ func (r *KafkaController) Reconcile(ctx context.Context, request ctrl.Request) (
defer func() {
err = config.UpdateMGHComponent(ctx, r.c,
r.getKafkaComponentStatus(reconcileErr, r.kafkaStatus),
updateConn,
)
if err != nil {
log.Errorf("failed to update mgh status, err:%v", err)
Expand Down Expand Up @@ -128,7 +130,7 @@ func (r *KafkaController) Reconcile(ctx context.Context, request ctrl.Request) (
if needRequeue {
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
config.SetTransporterConn(conn)
updateConn = config.SetTransporterConn(conn)

return ctrl.Result{}, nil
}
Expand All @@ -146,6 +148,7 @@ var kafkaPred = predicate.Funcs{
}

func StartKafkaController(ctx context.Context, mgr ctrl.Manager, transporter transport.Transporter) error {
log.Info("start kafka controller")
if startedKafkaController {
return nil
}
Expand Down
Loading

0 comments on commit 7a74b62

Please sign in to comment.